Hi Panagiotis, I did that, but it still prints the result of the first query and awaits for new data, doesn't even goes to the next one.
*Data -* $ nc -lk 9998 1,2 3,4 5,6 7,8 *Result -* ------------------------------------------- Batch: 0 ------------------------------------------- +----+ |aver| +----+ | 3.0| +----+ ------------------------------------------- Batch: 1 ------------------------------------------- +----+ |aver| +----+ | 4.0| +----+ *Updated Code -* from pyspark.sql import SparkSession from pyspark.sql.functions import split spark = SparkSession \ .builder \ .appName("StructuredNetworkWordCount") \ .getOrCreate() data = spark \ .readStream \ .format("socket") \ .option("header","true") \ .option("host", "localhost") \ .option("port", 9998) \ .load("csv") id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), split(data.value, ",").getItem(1).alias("col2")) id_DF.createOrReplaceTempView("ds") df = spark.sql("select avg(col1) as aver from ds") df.createOrReplaceTempView("abcd") wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 from ds") # (select aver from abcd) query2 = df \ .writeStream \ .format("console") \ .outputMode("complete") \ .trigger(processingTime='5 seconds') \ .start() query = wordCounts \ .writeStream \ .format("console") \ .trigger(processingTime='5 seconds') \ .start() spark.streams.awaitAnyTermination() Thanks, Aakash. On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <panga...@gmail.com> wrote: > Hello Aakash, > > When you use query.awaitTermination you are pretty much blocking there > waiting for the current query to stop or throw an exception. In your case > the second query will not even start. > What you could do instead is remove all the blocking calls and use > spark.streams.awaitAnyTermination instead (waiting for either query1 or > query2 to terminate). Make sure you do that after the query2.start call. > > I hope this helps. > > Cheers, > Panagiotis > > On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu <aakash.spark....@gmail.com> > wrote: > >> Any help? >> >> Need urgent help. Someone please clarify the doubt? >> >> ---------- Forwarded message ---------- >> From: Aakash Basu <aakash.spark....@gmail.com> >> Date: Thu, Apr 5, 2018 at 3:18 PM >> Subject: [Structured Streaming] More than 1 streaming in a code >> To: user <user@spark.apache.org> >> >> >> Hi, >> >> If I have more than one writeStream in a code, which operates on the same >> readStream data, why does it produce only the first writeStream? I want the >> second one to be also printed on the console. >> >> How to do that? >> >> from pyspark.sql import SparkSession >> from pyspark.sql.functions import split, col >> >> class test: >> >> >> spark = SparkSession.builder \ >> .appName("Stream_Col_Oper_Spark") \ >> .getOrCreate() >> >> data = spark.readStream.format("kafka") \ >> .option("startingOffsets", "latest") \ >> .option("kafka.bootstrap.servers", "localhost:9092") \ >> .option("subscribe", "test1") \ >> .load() >> >> ID = data.select('value') \ >> .withColumn('value', data.value.cast("string")) \ >> .withColumn("Col1", split(col("value"), ",").getItem(0)) \ >> .withColumn("Col2", split(col("value"), ",").getItem(1)) \ >> .drop('value') >> >> ID.createOrReplaceTempView("transformed_Stream_DF") >> >> df = spark.sql("select avg(col1) as aver from transformed_Stream_DF") >> >> df.createOrReplaceTempView("abcd") >> >> wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) >> col3 from transformed_Stream_DF") >> >> >> # -----------------------# >> >> query1 = df \ >> .writeStream \ >> .format("console") \ >> .outputMode("complete") \ >> .trigger(processingTime='3 seconds') \ >> .start() >> >> query1.awaitTermination() >> # -----------------------# >> >> query2 = wordCounts \ >> .writeStream \ >> .format("console") \ >> .trigger(processingTime='3 seconds') \ >> .start() >> >> query2.awaitTermination() >> >> # /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit >> --packages >> org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.databricks:spark-csv_2.10:1.0.3 >> >> /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py >> >> >> >> >> Thanks, >> Aakash. >> >> >