Re: [Structured Streaming] More than 1 streaming in a code
Hi Gerard, "If your actual source is Kafka, the original solution of using `spark.streams.awaitAnyTermination` should solve the problem." I tried literally everything, nothing worked out. 1) Tried NC from two different ports for two diff streams, still nothing worked. 2) Tried same using Kafka with awaitAnyTermination, still no use, the first stream write kept on blocking the second... (And inner queries with aggregation doesn't work in Spark Streaming it seems, as it expects a separate writeStream.start()). Any insight (or direct update to the code would be helpful). Thanks, Aakash. On Mon 16 Apr, 2018, 9:05 PM Gerard Maas, <gerard.m...@gmail.com> wrote: > Aakash, > > There are two issues here. > The issue with the code on the first question is that the first query > blocks and the code for the second does not get executed. Panagiotis > pointed this out correctly. > In the updated code, the issue is related to netcat (nc) and the way > structured streaming works. As far as I remember, netcat only delivers data > to the first network connection. > On the structured streaming side, each query will issue its own > connection. This results in only the first query getting the data. > If you would talk to a TPC server supporting multiple connected clients, > you would see data in both queries. > > If your actual source is Kafka, the original solution of using > `spark.streams.awaitAnyTermination` should solve the problem. > > -kr, Gerard. > > > > On Mon, Apr 16, 2018 at 10:52 AM, Aakash Basu <aakash.spark@gmail.com> > wrote: > >> Hey Jayesh and Others, >> >> Is there then, any other way to come to a solution for this use-case? >> >> Thanks, >> Aakash. >> >> On Mon, Apr 16, 2018 at 8:11 AM, Lalwani, Jayesh < >> jayesh.lalw...@capitalone.com> wrote: >> >>> Note that what you are trying to do here is join a streaming data frame >>> with an aggregated streaming data frame. As per the documentation, joining >>> an aggregated streaming data frame with another streaming data frame is not >>> supported >>> >>> >>> >>> >>> >>> *From: *spark receiver <spark.recei...@gmail.com> >>> *Date: *Friday, April 13, 2018 at 11:49 PM >>> *To: *Aakash Basu <aakash.spark@gmail.com> >>> *Cc: *Panagiotis Garefalakis <panga...@gmail.com>, user < >>> user@spark.apache.org> >>> *Subject: *Re: [Structured Streaming] More than 1 streaming in a code >>> >>> >>> >>> Hi Panagiotis , >>> >>> >>> >>> Wondering you solved the problem or not? Coz I met the same issue today. >>> I’d appreciate so much if you could paste the code snippet if it’s >>> working . >>> >>> >>> >>> Thanks. >>> >>> >>> >>> >>> >>> 在 2018年4月6日,上午7:40,Aakash Basu <aakash.spark@gmail.com> 写道: >>> >>> >>> >>> Hi Panagiotis, >>> >>> I did that, but it still prints the result of the first query and awaits >>> for new data, doesn't even goes to the next one. >>> >>> *Data -* >>> >>> $ nc -lk 9998 >>> >>> 1,2 >>> 3,4 >>> 5,6 >>> 7,8 >>> >>> *Result -* >>> >>> --- >>> Batch: 0 >>> --- >>> ++ >>> |aver| >>> ++ >>> | 3.0| >>> ++ >>> >>> --- >>> Batch: 1 >>> --- >>> ++ >>> |aver| >>> ++ >>> | 4.0| >>> ++ >>> >>> >>> >>> *Updated Code -* >>> >>> from pyspark.sql import SparkSession >>> from pyspark.sql.functions import split >>> >>> spark = SparkSession \ >>> .builder \ >>> .appName("StructuredNetworkWordCount") \ >>> .getOrCreate() >>> >>> data = spark \ >>> .readStream \ >>> .format("socket") \ >>> .option("header","true") \ >>> .option("host", "localhost") \ >>> .option("port", 9998) \ >>> .load("csv") >>> >>> >>> id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), >>> split(data.value, ","
Re: [Structured Streaming] More than 1 streaming in a code
Aakash, There are two issues here. The issue with the code on the first question is that the first query blocks and the code for the second does not get executed. Panagiotis pointed this out correctly. In the updated code, the issue is related to netcat (nc) and the way structured streaming works. As far as I remember, netcat only delivers data to the first network connection. On the structured streaming side, each query will issue its own connection. This results in only the first query getting the data. If you would talk to a TPC server supporting multiple connected clients, you would see data in both queries. If your actual source is Kafka, the original solution of using `spark.streams.awaitAnyTermination` should solve the problem. -kr, Gerard. On Mon, Apr 16, 2018 at 10:52 AM, Aakash Basu <aakash.spark@gmail.com> wrote: > Hey Jayesh and Others, > > Is there then, any other way to come to a solution for this use-case? > > Thanks, > Aakash. > > On Mon, Apr 16, 2018 at 8:11 AM, Lalwani, Jayesh < > jayesh.lalw...@capitalone.com> wrote: > >> Note that what you are trying to do here is join a streaming data frame >> with an aggregated streaming data frame. As per the documentation, joining >> an aggregated streaming data frame with another streaming data frame is not >> supported >> >> >> >> >> >> *From: *spark receiver <spark.recei...@gmail.com> >> *Date: *Friday, April 13, 2018 at 11:49 PM >> *To: *Aakash Basu <aakash.spark@gmail.com> >> *Cc: *Panagiotis Garefalakis <panga...@gmail.com>, user < >> user@spark.apache.org> >> *Subject: *Re: [Structured Streaming] More than 1 streaming in a code >> >> >> >> Hi Panagiotis , >> >> >> >> Wondering you solved the problem or not? Coz I met the same issue today. >> I’d appreciate so much if you could paste the code snippet if it’s >> working . >> >> >> >> Thanks. >> >> >> >> >> >> 在 2018年4月6日,上午7:40,Aakash Basu <aakash.spark@gmail.com> 写道: >> >> >> >> Hi Panagiotis, >> >> I did that, but it still prints the result of the first query and awaits >> for new data, doesn't even goes to the next one. >> >> *Data -* >> >> $ nc -lk 9998 >> >> 1,2 >> 3,4 >> 5,6 >> 7,8 >> >> *Result -* >> >> --- >> Batch: 0 >> --- >> ++ >> |aver| >> ++ >> | 3.0| >> ++ >> >> --- >> Batch: 1 >> --- >> ++ >> |aver| >> ++ >> | 4.0| >> ++ >> >> >> >> *Updated Code -* >> >> from pyspark.sql import SparkSession >> from pyspark.sql.functions import split >> >> spark = SparkSession \ >> .builder \ >> .appName("StructuredNetworkWordCount") \ >> .getOrCreate() >> >> data = spark \ >> .readStream \ >> .format("socket") \ >> .option("header","true") \ >> .option("host", "localhost") \ >> .option("port", 9998) \ >> .load("csv") >> >> >> id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), >> split(data.value, ",").getItem(1).alias("col2")) >> >> id_DF.createOrReplaceTempView("ds") >> >> df = spark.sql("select avg(col1) as aver from ds") >> >> df.createOrReplaceTempView("abcd") >> >> wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 >> from ds") # (select aver from abcd) >> >> query2 = df \ >> .writeStream \ >> .format("console") \ >> .outputMode("complete") \ >> .trigger(processingTime='5 seconds') \ >> .start() >> >> query = wordCounts \ >> .writeStream \ >> .format("console") \ >> .trigger(processingTime='5 seconds') \ >> .start() >> >> spark.streams.awaitAnyTermination() >> >> >> >> Thanks, >> >> Aakash. >> >> >> >> On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis < >> panga...@gmail.com> wrote: >> >> Hello Aakash, >> >> >> >> When you use query.awaitTermination you are pretty much blocking there >> waitin
Re: [Structured Streaming] More than 1 streaming in a code
You could have a really large window. From: Aakash Basu <aakash.spark@gmail.com> Date: Monday, April 16, 2018 at 10:56 AM To: "Lalwani, Jayesh" <jayesh.lalw...@capitalone.com> Cc: spark receiver <spark.recei...@gmail.com>, Panagiotis Garefalakis <panga...@gmail.com>, user <user@spark.apache.org> Subject: Re: [Structured Streaming] More than 1 streaming in a code If I use timestamp based windowing, then my average will not be global average but grouped by timestamp, which is not my requirement. I want to recalculate the avg of entire column, every time a new row(s) comes in and divide the other column with the updated avg. Let me know, in-case you or anyone else has any soln. for this. On Mon, Apr 16, 2018 at 7:52 PM, Lalwani, Jayesh <jayesh.lalw...@capitalone.com<mailto:jayesh.lalw...@capitalone.com>> wrote: You could do it if you had a timestamp in your data. You can use windowed operations to divide a value by it’s own average over a window. However, in structured streaming, you can only window by timestamp columns. You cannot do windows aggregations on integers. From: Aakash Basu <aakash.spark@gmail.com<mailto:aakash.spark@gmail.com>> Date: Monday, April 16, 2018 at 4:52 AM To: "Lalwani, Jayesh" <jayesh.lalw...@capitalone.com<mailto:jayesh.lalw...@capitalone.com>> Cc: spark receiver <spark.recei...@gmail.com<mailto:spark.recei...@gmail.com>>, Panagiotis Garefalakis <panga...@gmail.com<mailto:panga...@gmail.com>>, user <user@spark.apache.org<mailto:user@spark.apache.org>> Subject: Re: [Structured Streaming] More than 1 streaming in a code Hey Jayesh and Others, Is there then, any other way to come to a solution for this use-case? Thanks, Aakash. On Mon, Apr 16, 2018 at 8:11 AM, Lalwani, Jayesh <jayesh.lalw...@capitalone.com<mailto:jayesh.lalw...@capitalone.com>> wrote: Note that what you are trying to do here is join a streaming data frame with an aggregated streaming data frame. As per the documentation, joining an aggregated streaming data frame with another streaming data frame is not supported From: spark receiver <spark.recei...@gmail.com<mailto:spark.recei...@gmail.com>> Date: Friday, April 13, 2018 at 11:49 PM To: Aakash Basu <aakash.spark@gmail.com<mailto:aakash.spark@gmail.com>> Cc: Panagiotis Garefalakis <panga...@gmail.com<mailto:panga...@gmail.com>>, user <user@spark.apache.org<mailto:user@spark.apache.org>> Subject: Re: [Structured Streaming] More than 1 streaming in a code Hi Panagiotis , Wondering you solved the problem or not? Coz I met the same issue today. I’d appreciate so much if you could paste the code snippet if it’s working . Thanks. 在 2018年4月6日,上午7:40,Aakash Basu <aakash.spark@gmail.com<mailto:aakash.spark@gmail.com>> 写道: Hi Panagiotis, I did that, but it still prints the result of the first query and awaits for new data, doesn't even goes to the next one. Data - $ nc -lk 9998 1,2 3,4 5,6 7,8 Result - --- Batch: 0 --- ++ |aver| ++ | 3.0| ++ --- Batch: 1 --- ++ |aver| ++ | 4.0| ++ Updated Code - from pyspark.sql import SparkSession from pyspark.sql.functions import split spark = SparkSession \ .builder \ .appName("StructuredNetworkWordCount") \ .getOrCreate() data = spark \ .readStream \ .format("socket") \ .option("header","true") \ .option("host", "localhost") \ .option("port", 9998) \ .load("csv") id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), split(data.value, ",").getItem(1).alias("col2")) id_DF.createOrReplaceTempView("ds") df = spark.sql("select avg(col1) as aver from ds") df.createOrReplaceTempView("abcd") wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 from ds") # (select aver from abcd) query2 = df \ .writeStream \ .format("console") \ .outputMode("complete") \ .trigger(processingTime='5 seconds') \ .start() query = wordCounts \ .writeStream \ .format("console") \ .trigger(processingTime='5 seconds') \ .start() spark.streams.awaitAnyTermination() Thanks, Aakash. On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <panga...@gmail.com<mailto:panga...@gmail.com>> wrote: Hello Aakash, When you use query.awaitTermination you are pretty much blocking there waiting for the current query to stop or throw an exception. In your case the second query will not even start. What you could do
Re: [Structured Streaming] More than 1 streaming in a code
If I use timestamp based windowing, then my average will not be global average but grouped by timestamp, which is not my requirement. I want to recalculate the avg of entire column, every time a new row(s) comes in and divide the other column with the updated avg. Let me know, in-case you or anyone else has any soln. for this. On Mon, Apr 16, 2018 at 7:52 PM, Lalwani, Jayesh < jayesh.lalw...@capitalone.com> wrote: > You could do it if you had a timestamp in your data. You can use windowed > operations to divide a value by it’s own average over a window. However, in > structured streaming, you can only window by timestamp columns. You cannot > do windows aggregations on integers. > > > > *From: *Aakash Basu <aakash.spark@gmail.com> > *Date: *Monday, April 16, 2018 at 4:52 AM > *To: *"Lalwani, Jayesh" <jayesh.lalw...@capitalone.com> > *Cc: *spark receiver <spark.recei...@gmail.com>, Panagiotis Garefalakis < > panga...@gmail.com>, user <user@spark.apache.org> > > *Subject: *Re: [Structured Streaming] More than 1 streaming in a code > > > > Hey Jayesh and Others, > > Is there then, any other way to come to a solution for this use-case? > > > > Thanks, > > Aakash. > > > > On Mon, Apr 16, 2018 at 8:11 AM, Lalwani, Jayesh < > jayesh.lalw...@capitalone.com> wrote: > > Note that what you are trying to do here is join a streaming data frame > with an aggregated streaming data frame. As per the documentation, joining > an aggregated streaming data frame with another streaming data frame is not > supported > > > > > > *From: *spark receiver <spark.recei...@gmail.com> > *Date: *Friday, April 13, 2018 at 11:49 PM > *To: *Aakash Basu <aakash.spark@gmail.com> > *Cc: *Panagiotis Garefalakis <panga...@gmail.com>, user < > user@spark.apache.org> > *Subject: *Re: [Structured Streaming] More than 1 streaming in a code > > > > Hi Panagiotis , > > > > Wondering you solved the problem or not? Coz I met the same issue today. > I’d appreciate so much if you could paste the code snippet if it’s > working . > > > > Thanks. > > > > > > 在 2018年4月6日,上午7:40,Aakash Basu <aakash.spark@gmail.com> 写道: > > > > Hi Panagiotis, > > I did that, but it still prints the result of the first query and awaits > for new data, doesn't even goes to the next one. > > *Data -* > > $ nc -lk 9998 > > 1,2 > 3,4 > 5,6 > 7,8 > > *Result -* > > --- > Batch: 0 > --- > ++ > |aver| > ++ > | 3.0| > ++ > > --- > Batch: 1 > --- > ++ > |aver| > ++ > | 4.0| > ++ > > > > *Updated Code -* > > from pyspark.sql import SparkSession > from pyspark.sql.functions import split > > spark = SparkSession \ > .builder \ > .appName("StructuredNetworkWordCount") \ > .getOrCreate() > > data = spark \ > .readStream \ > .format("socket") \ > .option("header","true") \ > .option("host", "localhost") \ > .option("port", 9998) \ > .load("csv") > > > id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), > split(data.value, ",").getItem(1).alias("col2")) > > id_DF.createOrReplaceTempView("ds") > > df = spark.sql("select avg(col1) as aver from ds") > > df.createOrReplaceTempView("abcd") > > wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 > from ds") # (select aver from abcd) > > query2 = df \ > .writeStream \ > .format("console") \ > .outputMode("complete") \ > .trigger(processingTime='5 seconds') \ > .start() > > query = wordCounts \ > .writeStream \ > .format("console") \ > .trigger(processingTime='5 seconds') \ > .start() > > spark.streams.awaitAnyTermination() > > > > Thanks, > > Aakash. > > > > On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <panga...@gmail.com> > wrote: > > Hello Aakash, > > > > When you use query.awaitTermination you are pretty much blocking there > waiting for the current query to stop or throw an exception. In your case > the second query will not even start. > > What you could do instead is remove all the blocking calls and use > spark.streams.awaitAnyTerminat
Re: [Structured Streaming] More than 1 streaming in a code
You could do it if you had a timestamp in your data. You can use windowed operations to divide a value by it’s own average over a window. However, in structured streaming, you can only window by timestamp columns. You cannot do windows aggregations on integers. From: Aakash Basu <aakash.spark@gmail.com> Date: Monday, April 16, 2018 at 4:52 AM To: "Lalwani, Jayesh" <jayesh.lalw...@capitalone.com> Cc: spark receiver <spark.recei...@gmail.com>, Panagiotis Garefalakis <panga...@gmail.com>, user <user@spark.apache.org> Subject: Re: [Structured Streaming] More than 1 streaming in a code Hey Jayesh and Others, Is there then, any other way to come to a solution for this use-case? Thanks, Aakash. On Mon, Apr 16, 2018 at 8:11 AM, Lalwani, Jayesh <jayesh.lalw...@capitalone.com<mailto:jayesh.lalw...@capitalone.com>> wrote: Note that what you are trying to do here is join a streaming data frame with an aggregated streaming data frame. As per the documentation, joining an aggregated streaming data frame with another streaming data frame is not supported From: spark receiver <spark.recei...@gmail.com<mailto:spark.recei...@gmail.com>> Date: Friday, April 13, 2018 at 11:49 PM To: Aakash Basu <aakash.spark@gmail.com<mailto:aakash.spark@gmail.com>> Cc: Panagiotis Garefalakis <panga...@gmail.com<mailto:panga...@gmail.com>>, user <user@spark.apache.org<mailto:user@spark.apache.org>> Subject: Re: [Structured Streaming] More than 1 streaming in a code Hi Panagiotis , Wondering you solved the problem or not? Coz I met the same issue today. I’d appreciate so much if you could paste the code snippet if it’s working . Thanks. 在 2018年4月6日,上午7:40,Aakash Basu <aakash.spark@gmail.com<mailto:aakash.spark@gmail.com>> 写道: Hi Panagiotis, I did that, but it still prints the result of the first query and awaits for new data, doesn't even goes to the next one. Data - $ nc -lk 9998 1,2 3,4 5,6 7,8 Result - --- Batch: 0 --- ++ |aver| ++ | 3.0| ++ --- Batch: 1 --- ++ |aver| ++ | 4.0| ++ Updated Code - from pyspark.sql import SparkSession from pyspark.sql.functions import split spark = SparkSession \ .builder \ .appName("StructuredNetworkWordCount") \ .getOrCreate() data = spark \ .readStream \ .format("socket") \ .option("header","true") \ .option("host", "localhost") \ .option("port", 9998) \ .load("csv") id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), split(data.value, ",").getItem(1).alias("col2")) id_DF.createOrReplaceTempView("ds") df = spark.sql("select avg(col1) as aver from ds") df.createOrReplaceTempView("abcd") wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 from ds") # (select aver from abcd) query2 = df \ .writeStream \ .format("console") \ .outputMode("complete") \ .trigger(processingTime='5 seconds') \ .start() query = wordCounts \ .writeStream \ .format("console") \ .trigger(processingTime='5 seconds') \ .start() spark.streams.awaitAnyTermination() Thanks, Aakash. On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <panga...@gmail.com<mailto:panga...@gmail.com>> wrote: Hello Aakash, When you use query.awaitTermination you are pretty much blocking there waiting for the current query to stop or throw an exception. In your case the second query will not even start. What you could do instead is remove all the blocking calls and use spark.streams.awaitAnyTermination instead (waiting for either query1 or query2 to terminate). Make sure you do that after the query2.start call. I hope this helps. Cheers, Panagiotis On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu <aakash.spark....@gmail.com<mailto:aakash.spark@gmail.com>> wrote: Any help? Need urgent help. Someone please clarify the doubt? -- Forwarded message -- From: Aakash Basu <aakash.spark@gmail.com<mailto:aakash.spark@gmail.com>> Date: Thu, Apr 5, 2018 at 3:18 PM Subject: [Structured Streaming] More than 1 streaming in a code To: user <user@spark.apache.org<mailto:user@spark.apache.org>> Hi, If I have more than one writeStream in a code, which operates on the same readStream data, why does it produce only the first writeStream? I want the second one to be also printed on the console. How to do that? from pyspark.sql import SparkSession from pyspark.sql.functions import split, col class test: spark = SparkSession
Re: [Structured Streaming] More than 1 streaming in a code
Hey Jayesh and Others, Is there then, any other way to come to a solution for this use-case? Thanks, Aakash. On Mon, Apr 16, 2018 at 8:11 AM, Lalwani, Jayesh < jayesh.lalw...@capitalone.com> wrote: > Note that what you are trying to do here is join a streaming data frame > with an aggregated streaming data frame. As per the documentation, joining > an aggregated streaming data frame with another streaming data frame is not > supported > > > > > > *From: *spark receiver <spark.recei...@gmail.com> > *Date: *Friday, April 13, 2018 at 11:49 PM > *To: *Aakash Basu <aakash.spark@gmail.com> > *Cc: *Panagiotis Garefalakis <panga...@gmail.com>, user < > user@spark.apache.org> > *Subject: *Re: [Structured Streaming] More than 1 streaming in a code > > > > Hi Panagiotis , > > > > Wondering you solved the problem or not? Coz I met the same issue today. > I’d appreciate so much if you could paste the code snippet if it’s > working . > > > > Thanks. > > > > > > 在 2018年4月6日,上午7:40,Aakash Basu <aakash.spark@gmail.com> 写道: > > > > Hi Panagiotis, > > I did that, but it still prints the result of the first query and awaits > for new data, doesn't even goes to the next one. > > *Data -* > > $ nc -lk 9998 > > 1,2 > 3,4 > 5,6 > 7,8 > > *Result -* > > --- > Batch: 0 > --- > ++ > |aver| > ++ > | 3.0| > ++ > > --- > Batch: 1 > --- > ++ > |aver| > ++ > | 4.0| > ++ > > > > *Updated Code -* > > from pyspark.sql import SparkSession > from pyspark.sql.functions import split > > spark = SparkSession \ > .builder \ > .appName("StructuredNetworkWordCount") \ > .getOrCreate() > > data = spark \ > .readStream \ > .format("socket") \ > .option("header","true") \ > .option("host", "localhost") \ > .option("port", 9998) \ > .load("csv") > > > id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), > split(data.value, ",").getItem(1).alias("col2")) > > id_DF.createOrReplaceTempView("ds") > > df = spark.sql("select avg(col1) as aver from ds") > > df.createOrReplaceTempView("abcd") > > wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 > from ds") # (select aver from abcd) > > query2 = df \ > .writeStream \ > .format("console") \ > .outputMode("complete") \ > .trigger(processingTime='5 seconds') \ > .start() > > query = wordCounts \ > .writeStream \ > .format("console") \ > .trigger(processingTime='5 seconds') \ > .start() > > spark.streams.awaitAnyTermination() > > > > Thanks, > > Aakash. > > > > On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <panga...@gmail.com> > wrote: > > Hello Aakash, > > > > When you use query.awaitTermination you are pretty much blocking there > waiting for the current query to stop or throw an exception. In your case > the second query will not even start. > > What you could do instead is remove all the blocking calls and use > spark.streams.awaitAnyTermination instead (waiting for either query1 or > query2 to terminate). Make sure you do that after the query2.start call. > > > > I hope this helps. > > > > Cheers, > > Panagiotis > > > > On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu <aakash.spark@gmail.com> > wrote: > > Any help? > > Need urgent help. Someone please clarify the doubt? > > > > -- Forwarded message -- > From: *Aakash Basu* <aakash.spark@gmail.com> > Date: Thu, Apr 5, 2018 at 3:18 PM > Subject: [Structured Streaming] More than 1 streaming in a code > To: user <user@spark.apache.org> > > Hi, > > If I have more than one writeStream in a code, which operates on the same > readStream data, why does it produce only the first writeStream? I want the > second one to be also printed on the console. > > How to do that? > > > > from pyspark.sql import SparkSession > from pyspark.sql.functions import split, col > > class test: > > > spark = SparkSession.builder \ > .appName("Stream_Col_Oper_Spark") \ > .getOrCreate() > >
Re: [Structured Streaming] More than 1 streaming in a code
Note that what you are trying to do here is join a streaming data frame with an aggregated streaming data frame. As per the documentation, joining an aggregated streaming data frame with another streaming data frame is not supported From: spark receiver <spark.recei...@gmail.com> Date: Friday, April 13, 2018 at 11:49 PM To: Aakash Basu <aakash.spark@gmail.com> Cc: Panagiotis Garefalakis <panga...@gmail.com>, user <user@spark.apache.org> Subject: Re: [Structured Streaming] More than 1 streaming in a code Hi Panagiotis , Wondering you solved the problem or not? Coz I met the same issue today. I’d appreciate so much if you could paste the code snippet if it’s working . Thanks. 在 2018年4月6日,上午7:40,Aakash Basu <aakash.spark@gmail.com<mailto:aakash.spark@gmail.com>> 写道: Hi Panagiotis, I did that, but it still prints the result of the first query and awaits for new data, doesn't even goes to the next one. Data - $ nc -lk 9998 1,2 3,4 5,6 7,8 Result - --- Batch: 0 --- ++ |aver| ++ | 3.0| ++ --- Batch: 1 --- ++ |aver| ++ | 4.0| ++ Updated Code - from pyspark.sql import SparkSession from pyspark.sql.functions import split spark = SparkSession \ .builder \ .appName("StructuredNetworkWordCount") \ .getOrCreate() data = spark \ .readStream \ .format("socket") \ .option("header","true") \ .option("host", "localhost") \ .option("port", 9998) \ .load("csv") id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), split(data.value, ",").getItem(1).alias("col2")) id_DF.createOrReplaceTempView("ds") df = spark.sql("select avg(col1) as aver from ds") df.createOrReplaceTempView("abcd") wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 from ds") # (select aver from abcd) query2 = df \ .writeStream \ .format("console") \ .outputMode("complete") \ .trigger(processingTime='5 seconds') \ .start() query = wordCounts \ .writeStream \ .format("console") \ .trigger(processingTime='5 seconds') \ .start() spark.streams.awaitAnyTermination() Thanks, Aakash. On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <panga...@gmail.com<mailto:panga...@gmail.com>> wrote: Hello Aakash, When you use query.awaitTermination you are pretty much blocking there waiting for the current query to stop or throw an exception. In your case the second query will not even start. What you could do instead is remove all the blocking calls and use spark.streams.awaitAnyTermination instead (waiting for either query1 or query2 to terminate). Make sure you do that after the query2.start call. I hope this helps. Cheers, Panagiotis On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu <aakash.spark@gmail.com<mailto:aakash.spark@gmail.com>> wrote: Any help? Need urgent help. Someone please clarify the doubt? -- Forwarded message -- From: Aakash Basu <aakash.spark@gmail.com<mailto:aakash.spark@gmail.com>> Date: Thu, Apr 5, 2018 at 3:18 PM Subject: [Structured Streaming] More than 1 streaming in a code To: user <user@spark.apache.org<mailto:user@spark.apache.org>> Hi, If I have more than one writeStream in a code, which operates on the same readStream data, why does it produce only the first writeStream? I want the second one to be also printed on the console. How to do that? from pyspark.sql import SparkSession from pyspark.sql.functions import split, col class test: spark = SparkSession.builder \ .appName("Stream_Col_Oper_Spark") \ .getOrCreate() data = spark.readStream.format("kafka") \ .option("startingOffsets", "latest") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "test1") \ .load() ID = data.select('value') \ .withColumn('value', data.value.cast("string")) \ .withColumn("Col1", split(col("value"), ",").getItem(0)) \ .withColumn("Col2", split(col("value"), ",").getItem(1)) \ .drop('value') ID.createOrReplaceTempView("transformed_Stream_DF") df = spark.sql("select avg(col1) as aver from transformed_Stream_DF") df.createOrReplaceTempView("abcd") wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 from transformed_Stream_DF") # ---#
Re: [Structured Streaming] More than 1 streaming in a code
Hi Panagiotis , Wondering you solved the problem or not? Coz I met the same issue today. I’d appreciate so much if you could paste the code snippet if it’s working . Thanks. > 在 2018年4月6日,上午7:40,Aakash Basu <aakash.spark@gmail.com> 写道: > > Hi Panagiotis, > > I did that, but it still prints the result of the first query and awaits for > new data, doesn't even goes to the next one. > > Data - > > $ nc -lk 9998 > > 1,2 > 3,4 > 5,6 > 7,8 > > Result - > > --- > Batch: 0 > --- > ++ > |aver| > ++ > | 3.0| > ++ > > --- > Batch: 1 > --- > ++ > |aver| > ++ > | 4.0| > ++ > > > Updated Code - > from pyspark.sql import SparkSession > from pyspark.sql.functions import split > > spark = SparkSession \ > .builder \ > .appName("StructuredNetworkWordCount") \ > .getOrCreate() > > data = spark \ > .readStream \ > .format("socket") \ > .option("header","true") \ > .option("host", "localhost") \ > .option("port", 9998) \ > .load("csv") > > > id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), > split(data.value, ",").getItem(1).alias("col2")) > > id_DF.createOrReplaceTempView("ds") > > df = spark.sql("select avg(col1) as aver from ds") > > df.createOrReplaceTempView("abcd") > > wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 > from ds") # (select aver from abcd) > > query2 = df \ > .writeStream \ > .format("console") \ > .outputMode("complete") \ > .trigger(processingTime='5 seconds') \ > .start() > > query = wordCounts \ > .writeStream \ > .format("console") \ > .trigger(processingTime='5 seconds') \ > .start() > > spark.streams.awaitAnyTermination() > > > Thanks, > Aakash. > > On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <panga...@gmail.com > <mailto:panga...@gmail.com>> wrote: > Hello Aakash, > > When you use query.awaitTermination you are pretty much blocking there > waiting for the current query to stop or throw an exception. In your case the > second query will not even start. > What you could do instead is remove all the blocking calls and use > spark.streams.awaitAnyTermination instead (waiting for either query1 or > query2 to terminate). Make sure you do that after the query2.start call. > > I hope this helps. > > Cheers, > Panagiotis > > On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu <aakash.spark@gmail.com > <mailto:aakash.spark@gmail.com>> wrote: > Any help? > > Need urgent help. Someone please clarify the doubt? > > -- Forwarded message -- > From: Aakash Basu <aakash.spark@gmail.com > <mailto:aakash.spark@gmail.com>> > Date: Thu, Apr 5, 2018 at 3:18 PM > Subject: [Structured Streaming] More than 1 streaming in a code > To: user <user@spark.apache.org <mailto:user@spark.apache.org>> > > > Hi, > > If I have more than one writeStream in a code, which operates on the same > readStream data, why does it produce only the first writeStream? I want the > second one to be also printed on the console. > > How to do that? > > from pyspark.sql import SparkSession > from pyspark.sql.functions import split, col > > class test: > > > spark = SparkSession.builder \ > .appName("Stream_Col_Oper_Spark") \ > .getOrCreate() > > data = spark.readStream.format("kafka") \ > .option("startingOffsets", "latest") \ > .option("kafka.bootstrap.servers", "localhost:9092") \ > .option("subscribe", "test1") \ > .load() > > ID = data.select('value') \ > .withColumn('value', data.value.cast("string")) \ > .withColumn("Col1", split(col("value"), ",").getItem(0)) \ > .withColumn("Col2", split(col("value"), ",").getItem(1)) \ > .drop('value') > > ID.createOrReplaceTempView("transformed_Stream_DF") > > df = spark.sql("select avg(col1) as aver from transformed_Stream_DF") > > df.
Re: [Structured Streaming] More than 1 streaming in a code
Hi Panagiotis, I did that, but it still prints the result of the first query and awaits for new data, doesn't even goes to the next one. *Data -* $ nc -lk 9998 1,2 3,4 5,6 7,8 *Result -* --- Batch: 0 --- ++ |aver| ++ | 3.0| ++ --- Batch: 1 --- ++ |aver| ++ | 4.0| ++ *Updated Code -* from pyspark.sql import SparkSession from pyspark.sql.functions import split spark = SparkSession \ .builder \ .appName("StructuredNetworkWordCount") \ .getOrCreate() data = spark \ .readStream \ .format("socket") \ .option("header","true") \ .option("host", "localhost") \ .option("port", 9998) \ .load("csv") id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), split(data.value, ",").getItem(1).alias("col2")) id_DF.createOrReplaceTempView("ds") df = spark.sql("select avg(col1) as aver from ds") df.createOrReplaceTempView("abcd") wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 from ds") # (select aver from abcd) query2 = df \ .writeStream \ .format("console") \ .outputMode("complete") \ .trigger(processingTime='5 seconds') \ .start() query = wordCounts \ .writeStream \ .format("console") \ .trigger(processingTime='5 seconds') \ .start() spark.streams.awaitAnyTermination() Thanks, Aakash. On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <panga...@gmail.com> wrote: > Hello Aakash, > > When you use query.awaitTermination you are pretty much blocking there > waiting for the current query to stop or throw an exception. In your case > the second query will not even start. > What you could do instead is remove all the blocking calls and use > spark.streams.awaitAnyTermination instead (waiting for either query1 or > query2 to terminate). Make sure you do that after the query2.start call. > > I hope this helps. > > Cheers, > Panagiotis > > On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu <aakash.spark@gmail.com> > wrote: > >> Any help? >> >> Need urgent help. Someone please clarify the doubt? >> >> -- Forwarded message -- >> From: Aakash Basu <aakash.spark@gmail.com> >> Date: Thu, Apr 5, 2018 at 3:18 PM >> Subject: [Structured Streaming] More than 1 streaming in a code >> To: user <user@spark.apache.org> >> >> >> Hi, >> >> If I have more than one writeStream in a code, which operates on the same >> readStream data, why does it produce only the first writeStream? I want the >> second one to be also printed on the console. >> >> How to do that? >> >> from pyspark.sql import SparkSession >> from pyspark.sql.functions import split, col >> >> class test: >> >> >> spark = SparkSession.builder \ >> .appName("Stream_Col_Oper_Spark") \ >> .getOrCreate() >> >> data = spark.readStream.format("kafka") \ >> .option("startingOffsets", "latest") \ >> .option("kafka.bootstrap.servers", "localhost:9092") \ >> .option("subscribe", "test1") \ >> .load() >> >> ID = data.select('value') \ >> .withColumn('value', data.value.cast("string")) \ >> .withColumn("Col1", split(col("value"), ",").getItem(0)) \ >> .withColumn("Col2", split(col("value"), ",").getItem(1)) \ >> .drop('value') >> >> ID.createOrReplaceTempView("transformed_Stream_DF") >> >> df = spark.sql("select avg(col1) as aver from transformed_Stream_DF") >> >> df.createOrReplaceTempView("abcd") >> >> wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) >> col3 from transformed_Stream_DF") >> >> >> # ---# >> >> query1 = df \ >> .writeStream \ >> .format("console") \ >> .outputMode("complete") \ >> .trigger(processingTime='3 seconds') \ >> .start() >> >> query1.awaitTermination() >> # ---# >> >> query2 = wordCounts \ >> .writeStream \ >> .format("console") \ >> .trigger(processingTime='3 seconds') \ >> .start() >> >> query2.awaitTermination() >> >> # /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit >> --packages >> org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.databricks:spark-csv_2.10:1.0.3 >> >> /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py >> >> >> >> >> Thanks, >> Aakash. >> >> >
Re: [Structured Streaming] More than 1 streaming in a code
Hello Aakash, When you use query.awaitTermination you are pretty much blocking there waiting for the current query to stop or throw an exception. In your case the second query will not even start. What you could do instead is remove all the blocking calls and use spark.streams.awaitAnyTermination instead (waiting for either query1 or query2 to terminate). Make sure you do that after the query2.start call. I hope this helps. Cheers, Panagiotis On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu <aakash.spark@gmail.com> wrote: > Any help? > > Need urgent help. Someone please clarify the doubt? > > -- Forwarded message -- > From: Aakash Basu <aakash.spark@gmail.com> > Date: Thu, Apr 5, 2018 at 3:18 PM > Subject: [Structured Streaming] More than 1 streaming in a code > To: user <user@spark.apache.org> > > > Hi, > > If I have more than one writeStream in a code, which operates on the same > readStream data, why does it produce only the first writeStream? I want the > second one to be also printed on the console. > > How to do that? > > from pyspark.sql import SparkSession > from pyspark.sql.functions import split, col > > class test: > > > spark = SparkSession.builder \ > .appName("Stream_Col_Oper_Spark") \ > .getOrCreate() > > data = spark.readStream.format("kafka") \ > .option("startingOffsets", "latest") \ > .option("kafka.bootstrap.servers", "localhost:9092") \ > .option("subscribe", "test1") \ > .load() > > ID = data.select('value') \ > .withColumn('value', data.value.cast("string")) \ > .withColumn("Col1", split(col("value"), ",").getItem(0)) \ > .withColumn("Col2", split(col("value"), ",").getItem(1)) \ > .drop('value') > > ID.createOrReplaceTempView("transformed_Stream_DF") > > df = spark.sql("select avg(col1) as aver from transformed_Stream_DF") > > df.createOrReplaceTempView("abcd") > > wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) > col3 from transformed_Stream_DF") > > > # ---# > > query1 = df \ > .writeStream \ > .format("console") \ > .outputMode("complete") \ > .trigger(processingTime='3 seconds') \ > .start() > > query1.awaitTermination() > # ---# > > query2 = wordCounts \ > .writeStream \ > .format("console") \ > .trigger(processingTime='3 seconds') \ > .start() > > query2.awaitTermination() > > # /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit > --packages > org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.databricks:spark-csv_2.10:1.0.3 > > /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py > > > > > Thanks, > Aakash. > >
Fwd: [Structured Streaming] More than 1 streaming in a code
Any help? Need urgent help. Someone please clarify the doubt? -- Forwarded message -- From: Aakash Basu <aakash.spark@gmail.com> Date: Thu, Apr 5, 2018 at 3:18 PM Subject: [Structured Streaming] More than 1 streaming in a code To: user <user@spark.apache.org> Hi, If I have more than one writeStream in a code, which operates on the same readStream data, why does it produce only the first writeStream? I want the second one to be also printed on the console. How to do that? from pyspark.sql import SparkSession from pyspark.sql.functions import split, col class test: spark = SparkSession.builder \ .appName("Stream_Col_Oper_Spark") \ .getOrCreate() data = spark.readStream.format("kafka") \ .option("startingOffsets", "latest") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "test1") \ .load() ID = data.select('value') \ .withColumn('value', data.value.cast("string")) \ .withColumn("Col1", split(col("value"), ",").getItem(0)) \ .withColumn("Col2", split(col("value"), ",").getItem(1)) \ .drop('value') ID.createOrReplaceTempView("transformed_Stream_DF") df = spark.sql("select avg(col1) as aver from transformed_Stream_DF") df.createOrReplaceTempView("abcd") wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 from transformed_Stream_DF") # ---# query1 = df \ .writeStream \ .format("console") \ .outputMode("complete") \ .trigger(processingTime='3 seconds') \ .start() query1.awaitTermination() # ---# query2 = wordCounts \ .writeStream \ .format("console") \ .trigger(processingTime='3 seconds') \ .start() query2.awaitTermination() # /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.databricks:spark-csv_2.10:1.0.3 /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py Thanks, Aakash.
[Structured Streaming] More than 1 streaming in a code
Hi, If I have more than one writeStream in a code, which operates on the same readStream data, why does it produce only the first writeStream? I want the second one to be also printed on the console. How to do that? from pyspark.sql import SparkSession from pyspark.sql.functions import split, col class test: spark = SparkSession.builder \ .appName("Stream_Col_Oper_Spark") \ .getOrCreate() data = spark.readStream.format("kafka") \ .option("startingOffsets", "latest") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "test1") \ .load() ID = data.select('value') \ .withColumn('value', data.value.cast("string")) \ .withColumn("Col1", split(col("value"), ",").getItem(0)) \ .withColumn("Col2", split(col("value"), ",").getItem(1)) \ .drop('value') ID.createOrReplaceTempView("transformed_Stream_DF") df = spark.sql("select avg(col1) as aver from transformed_Stream_DF") df.createOrReplaceTempView("abcd") wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 from transformed_Stream_DF") # ---# query1 = df \ .writeStream \ .format("console") \ .outputMode("complete") \ .trigger(processingTime='3 seconds') \ .start() query1.awaitTermination() # ---# query2 = wordCounts \ .writeStream \ .format("console") \ .trigger(processingTime='3 seconds') \ .start() query2.awaitTermination() # /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.databricks:spark-csv_2.10:1.0.3 /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py Thanks, Aakash.