Hey Jayesh and Others, Is there then, any other way to come to a solution for this use-case?
Thanks, Aakash. On Mon, Apr 16, 2018 at 8:11 AM, Lalwani, Jayesh < jayesh.lalw...@capitalone.com> wrote: > Note that what you are trying to do here is join a streaming data frame > with an aggregated streaming data frame. As per the documentation, joining > an aggregated streaming data frame with another streaming data frame is not > supported > > > > > > *From: *spark receiver <spark.recei...@gmail.com> > *Date: *Friday, April 13, 2018 at 11:49 PM > *To: *Aakash Basu <aakash.spark....@gmail.com> > *Cc: *Panagiotis Garefalakis <panga...@gmail.com>, user < > user@spark.apache.org> > *Subject: *Re: [Structured Streaming] More than 1 streaming in a code > > > > Hi Panagiotis , > > > > Wondering you solved the problem or not? Coz I met the same issue today. > I’d appreciate so much if you could paste the code snippet if it’s > working . > > > > Thanks. > > > > > > 在 2018年4月6日,上午7:40,Aakash Basu <aakash.spark....@gmail.com> 写道: > > > > Hi Panagiotis, > > I did that, but it still prints the result of the first query and awaits > for new data, doesn't even goes to the next one. > > *Data -* > > $ nc -lk 9998 > > 1,2 > 3,4 > 5,6 > 7,8 > > *Result -* > > ------------------------------------------- > Batch: 0 > ------------------------------------------- > +----+ > |aver| > +----+ > | 3.0| > +----+ > > ------------------------------------------- > Batch: 1 > ------------------------------------------- > +----+ > |aver| > +----+ > | 4.0| > +----+ > > > > *Updated Code -* > > from pyspark.sql import SparkSession > from pyspark.sql.functions import split > > spark = SparkSession \ > .builder \ > .appName("StructuredNetworkWordCount") \ > .getOrCreate() > > data = spark \ > .readStream \ > .format("socket") \ > .option("header","true") \ > .option("host", "localhost") \ > .option("port", 9998) \ > .load("csv") > > > id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), > split(data.value, ",").getItem(1).alias("col2")) > > id_DF.createOrReplaceTempView("ds") > > df = spark.sql("select avg(col1) as aver from ds") > > df.createOrReplaceTempView("abcd") > > wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 > from ds") # (select aver from abcd) > > query2 = df \ > .writeStream \ > .format("console") \ > .outputMode("complete") \ > .trigger(processingTime='5 seconds') \ > .start() > > query = wordCounts \ > .writeStream \ > .format("console") \ > .trigger(processingTime='5 seconds') \ > .start() > > spark.streams.awaitAnyTermination() > > > > Thanks, > > Aakash. > > > > On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <panga...@gmail.com> > wrote: > > Hello Aakash, > > > > When you use query.awaitTermination you are pretty much blocking there > waiting for the current query to stop or throw an exception. In your case > the second query will not even start. > > What you could do instead is remove all the blocking calls and use > spark.streams.awaitAnyTermination instead (waiting for either query1 or > query2 to terminate). Make sure you do that after the query2.start call. > > > > I hope this helps. > > > > Cheers, > > Panagiotis > > > > On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu <aakash.spark....@gmail.com> > wrote: > > Any help? > > Need urgent help. Someone please clarify the doubt? > > > > ---------- Forwarded message ---------- > From: *Aakash Basu* <aakash.spark....@gmail.com> > Date: Thu, Apr 5, 2018 at 3:18 PM > Subject: [Structured Streaming] More than 1 streaming in a code > To: user <user@spark.apache.org> > > Hi, > > If I have more than one writeStream in a code, which operates on the same > readStream data, why does it produce only the first writeStream? I want the > second one to be also printed on the console. > > How to do that? > > > > from pyspark.sql import SparkSession > from pyspark.sql.functions import split, col > > class test: > > > spark = SparkSession.builder \ > .appName("Stream_Col_Oper_Spark") \ > .getOrCreate() > > data = spark.readStream.format("kafka") \ > .option("startingOffsets", "latest") \ > .option("kafka.bootstrap.servers", "localhost:9092") \ > .option("subscribe", "test1") \ > .load() > > ID = data.select('value') \ > .withColumn('value', data.value.cast("string")) \ > .withColumn("Col1", split(col("value"), ",").getItem(0)) \ > .withColumn("Col2", split(col("value"), ",").getItem(1)) \ > .drop('value') > > ID.createOrReplaceTempView("transformed_Stream_DF") > > df = spark.sql("select avg(col1) as aver from transformed_Stream_DF") > > df.createOrReplaceTempView("abcd") > > wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) > col3 from transformed_Stream_DF") > > > # -----------------------# > > query1 = df \ > .writeStream \ > .format("console") \ > .outputMode("complete") \ > .trigger(processingTime='3 seconds') \ > .start() > > query1.awaitTermination() > # -----------------------# > > query2 = wordCounts \ > .writeStream \ > .format("console") \ > .trigger(processingTime='3 seconds') \ > .start() > > query2.awaitTermination() > > # /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit > --packages > org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.databricks:spark-csv_2.10:1.0.3 > > /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py > > > > Thanks, > > Aakash. > > > > > > > > > > ------------------------------ > > The information contained in this e-mail is confidential and/or > proprietary to Capital One and/or its affiliates and may only be used > solely in performance of work or services for Capital One. The information > transmitted herewith is intended only for use by the individual or entity > to which it is addressed. If the reader of this message is not the intended > recipient, you are hereby notified that any review, retransmission, > dissemination, distribution, copying or other use of, or taking of any > action in reliance upon this information is strictly prohibited. If you > have received this communication in error, please contact the sender and > delete the material from your computer. >