Hi Panagiotis ,

Wondering you solved the problem or not? Coz I met the same issue today. I’d 
appreciate  so much if you could paste the code snippet  if it’s working .

Thanks.


> 在 2018年4月6日,上午7:40,Aakash Basu <aakash.spark....@gmail.com> 写道:
> 
> Hi Panagiotis,
> 
> I did that, but it still prints the result of the first query and awaits for 
> new data, doesn't even goes to the next one.
> 
> Data -
> 
> $ nc -lk 9998
> 
> 1,2
> 3,4
> 5,6
> 7,8
> 
> Result -
> 
> -------------------------------------------
> Batch: 0
> -------------------------------------------
> +----+
> |aver|
> +----+
> | 3.0|
> +----+
> 
> -------------------------------------------
> Batch: 1
> -------------------------------------------
> +----+
> |aver|
> +----+
> | 4.0|
> +----+
> 
> 
> Updated Code -
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import split
> 
> spark = SparkSession \
>     .builder \
>     .appName("StructuredNetworkWordCount") \
>     .getOrCreate()
> 
> data = spark \
>     .readStream \
>     .format("socket") \
>     .option("header","true") \
>     .option("host", "localhost") \
>     .option("port", 9998) \
>     .load("csv")
> 
> 
> id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), 
> split(data.value, ",").getItem(1).alias("col2"))
> 
> id_DF.createOrReplaceTempView("ds")
> 
> df = spark.sql("select avg(col1) as aver from ds")
> 
> df.createOrReplaceTempView("abcd")
> 
> wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 
> from ds")  # (select aver from abcd)
> 
> query2 = df \
>     .writeStream \
>     .format("console") \
>     .outputMode("complete") \
>     .trigger(processingTime='5 seconds') \
>     .start()
> 
> query = wordCounts \
>     .writeStream \
>     .format("console") \
>     .trigger(processingTime='5 seconds') \
>     .start()
> 
> spark.streams.awaitAnyTermination()
> 
> 
> Thanks,
> Aakash.
> 
> On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <panga...@gmail.com 
> <mailto:panga...@gmail.com>> wrote:
> Hello Aakash,
> 
> When you use query.awaitTermination you are pretty much blocking there 
> waiting for the current query to stop or throw an exception. In your case the 
> second query will not even start.
> What you could do instead is remove all the blocking calls and use 
> spark.streams.awaitAnyTermination instead (waiting for either query1 or 
> query2 to terminate). Make sure you do that after the query2.start call.
> 
> I hope this helps.
> 
> Cheers,
> Panagiotis
> 
> On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu <aakash.spark....@gmail.com 
> <mailto:aakash.spark....@gmail.com>> wrote:
> Any help?
> 
> Need urgent help. Someone please clarify the doubt?
> 
> ---------- Forwarded message ----------
> From: Aakash Basu <aakash.spark....@gmail.com 
> <mailto:aakash.spark....@gmail.com>>
> Date: Thu, Apr 5, 2018 at 3:18 PM
> Subject: [Structured Streaming] More than 1 streaming in a code
> To: user <user@spark.apache.org <mailto:user@spark.apache.org>>
> 
> 
> Hi,
> 
> If I have more than one writeStream in a code, which operates on the same 
> readStream data, why does it produce only the first writeStream? I want the 
> second one to be also printed on the console.
> 
> How to do that?
> 
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import split, col
> 
> class test:
> 
> 
>     spark = SparkSession.builder \
>         .appName("Stream_Col_Oper_Spark") \
>         .getOrCreate()
> 
>     data = spark.readStream.format("kafka") \
>         .option("startingOffsets", "latest") \
>         .option("kafka.bootstrap.servers", "localhost:9092") \
>         .option("subscribe", "test1") \
>         .load()
> 
>     ID = data.select('value') \
>         .withColumn('value', data.value.cast("string")) \
>         .withColumn("Col1", split(col("value"), ",").getItem(0)) \
>         .withColumn("Col2", split(col("value"), ",").getItem(1)) \
>         .drop('value')
> 
>     ID.createOrReplaceTempView("transformed_Stream_DF")
> 
>     df = spark.sql("select avg(col1) as aver from transformed_Stream_DF")
> 
>     df.createOrReplaceTempView("abcd")
> 
>     wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) 
> col3 from transformed_Stream_DF")
> 
> 
>     # -----------------------#
> 
>     query1 = df \
>         .writeStream \
>         .format("console") \
>         .outputMode("complete") \
>         .trigger(processingTime='3 seconds') \
>         .start()
> 
>     query1.awaitTermination()
>     # -----------------------#
> 
>     query2 = wordCounts \
>         .writeStream \
>         .format("console") \
>         .trigger(processingTime='3 seconds') \
>         .start()
> 
>     query2.awaitTermination()
> 
>     # /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit 
> --packages 
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.databricks:spark-csv_2.10:1.0.3
>  
> /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py
> 
> 
> 
> Thanks,
> Aakash.
> 
> 
> 

Reply via email to