Hi Panagiotis , Wondering you solved the problem or not? Coz I met the same issue today. I’d appreciate so much if you could paste the code snippet if it’s working .
Thanks. > 在 2018年4月6日,上午7:40,Aakash Basu <aakash.spark....@gmail.com> 写道: > > Hi Panagiotis, > > I did that, but it still prints the result of the first query and awaits for > new data, doesn't even goes to the next one. > > Data - > > $ nc -lk 9998 > > 1,2 > 3,4 > 5,6 > 7,8 > > Result - > > ------------------------------------------- > Batch: 0 > ------------------------------------------- > +----+ > |aver| > +----+ > | 3.0| > +----+ > > ------------------------------------------- > Batch: 1 > ------------------------------------------- > +----+ > |aver| > +----+ > | 4.0| > +----+ > > > Updated Code - > from pyspark.sql import SparkSession > from pyspark.sql.functions import split > > spark = SparkSession \ > .builder \ > .appName("StructuredNetworkWordCount") \ > .getOrCreate() > > data = spark \ > .readStream \ > .format("socket") \ > .option("header","true") \ > .option("host", "localhost") \ > .option("port", 9998) \ > .load("csv") > > > id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), > split(data.value, ",").getItem(1).alias("col2")) > > id_DF.createOrReplaceTempView("ds") > > df = spark.sql("select avg(col1) as aver from ds") > > df.createOrReplaceTempView("abcd") > > wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 > from ds") # (select aver from abcd) > > query2 = df \ > .writeStream \ > .format("console") \ > .outputMode("complete") \ > .trigger(processingTime='5 seconds') \ > .start() > > query = wordCounts \ > .writeStream \ > .format("console") \ > .trigger(processingTime='5 seconds') \ > .start() > > spark.streams.awaitAnyTermination() > > > Thanks, > Aakash. > > On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <panga...@gmail.com > <mailto:panga...@gmail.com>> wrote: > Hello Aakash, > > When you use query.awaitTermination you are pretty much blocking there > waiting for the current query to stop or throw an exception. In your case the > second query will not even start. > What you could do instead is remove all the blocking calls and use > spark.streams.awaitAnyTermination instead (waiting for either query1 or > query2 to terminate). Make sure you do that after the query2.start call. > > I hope this helps. > > Cheers, > Panagiotis > > On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu <aakash.spark....@gmail.com > <mailto:aakash.spark....@gmail.com>> wrote: > Any help? > > Need urgent help. Someone please clarify the doubt? > > ---------- Forwarded message ---------- > From: Aakash Basu <aakash.spark....@gmail.com > <mailto:aakash.spark....@gmail.com>> > Date: Thu, Apr 5, 2018 at 3:18 PM > Subject: [Structured Streaming] More than 1 streaming in a code > To: user <user@spark.apache.org <mailto:user@spark.apache.org>> > > > Hi, > > If I have more than one writeStream in a code, which operates on the same > readStream data, why does it produce only the first writeStream? I want the > second one to be also printed on the console. > > How to do that? > > from pyspark.sql import SparkSession > from pyspark.sql.functions import split, col > > class test: > > > spark = SparkSession.builder \ > .appName("Stream_Col_Oper_Spark") \ > .getOrCreate() > > data = spark.readStream.format("kafka") \ > .option("startingOffsets", "latest") \ > .option("kafka.bootstrap.servers", "localhost:9092") \ > .option("subscribe", "test1") \ > .load() > > ID = data.select('value') \ > .withColumn('value', data.value.cast("string")) \ > .withColumn("Col1", split(col("value"), ",").getItem(0)) \ > .withColumn("Col2", split(col("value"), ",").getItem(1)) \ > .drop('value') > > ID.createOrReplaceTempView("transformed_Stream_DF") > > df = spark.sql("select avg(col1) as aver from transformed_Stream_DF") > > df.createOrReplaceTempView("abcd") > > wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) > col3 from transformed_Stream_DF") > > > # -----------------------# > > query1 = df \ > .writeStream \ > .format("console") \ > .outputMode("complete") \ > .trigger(processingTime='3 seconds') \ > .start() > > query1.awaitTermination() > # -----------------------# > > query2 = wordCounts \ > .writeStream \ > .format("console") \ > .trigger(processingTime='3 seconds') \ > .start() > > query2.awaitTermination() > > # /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit > --packages > org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.databricks:spark-csv_2.10:1.0.3 > > /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py > > > > Thanks, > Aakash. > > >