Hi Panagiotis,

I did that, but it still prints the result of the first query and awaits
for new data, doesn't even goes to the next one.

*Data -*

$ nc -lk 9998

1,2
3,4
5,6
7,8

*Result -*

-------------------------------------------
Batch: 0
-------------------------------------------
+----+
|aver|
+----+
| 3.0|
+----+

-------------------------------------------
Batch: 1
-------------------------------------------
+----+
|aver|
+----+
| 4.0|
+----+


*Updated Code -*

from pyspark.sql import SparkSession
from pyspark.sql.functions import split

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

data = spark \
    .readStream \
    .format("socket") \
    .option("header","true") \
    .option("host", "localhost") \
    .option("port", 9998) \
    .load("csv")


id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"),
split(data.value, ",").getItem(1).alias("col2"))

id_DF.createOrReplaceTempView("ds")

df = spark.sql("select avg(col1) as aver from ds")

df.createOrReplaceTempView("abcd")

wordCounts = spark.sql("Select col1, col2, col2/(select aver from
abcd) col3 from ds")  # (select aver from abcd)

query2 = df \
    .writeStream \
    .format("console") \
    .outputMode("complete") \
    .trigger(processingTime='5 seconds') \
    .start()

query = wordCounts \
    .writeStream \
    .format("console") \
    .trigger(processingTime='5 seconds') \
    .start()

spark.streams.awaitAnyTermination()



Thanks,
Aakash.

On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <panga...@gmail.com>
wrote:

> Hello Aakash,
>
> When you use query.awaitTermination you are pretty much blocking there
> waiting for the current query to stop or throw an exception. In your case
> the second query will not even start.
> What you could do instead is remove all the blocking calls and use
> spark.streams.awaitAnyTermination instead (waiting for either query1 or
> query2 to terminate). Make sure you do that after the query2.start call.
>
> I hope this helps.
>
> Cheers,
> Panagiotis
>
> On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu <aakash.spark....@gmail.com>
> wrote:
>
>> Any help?
>>
>> Need urgent help. Someone please clarify the doubt?
>>
>> ---------- Forwarded message ----------
>> From: Aakash Basu <aakash.spark....@gmail.com>
>> Date: Thu, Apr 5, 2018 at 3:18 PM
>> Subject: [Structured Streaming] More than 1 streaming in a code
>> To: user <user@spark.apache.org>
>>
>>
>> Hi,
>>
>> If I have more than one writeStream in a code, which operates on the same
>> readStream data, why does it produce only the first writeStream? I want the
>> second one to be also printed on the console.
>>
>> How to do that?
>>
>> from pyspark.sql import SparkSession
>> from pyspark.sql.functions import split, col
>>
>> class test:
>>
>>
>>     spark = SparkSession.builder \
>>         .appName("Stream_Col_Oper_Spark") \
>>         .getOrCreate()
>>
>>     data = spark.readStream.format("kafka") \
>>         .option("startingOffsets", "latest") \
>>         .option("kafka.bootstrap.servers", "localhost:9092") \
>>         .option("subscribe", "test1") \
>>         .load()
>>
>>     ID = data.select('value') \
>>         .withColumn('value', data.value.cast("string")) \
>>         .withColumn("Col1", split(col("value"), ",").getItem(0)) \
>>         .withColumn("Col2", split(col("value"), ",").getItem(1)) \
>>         .drop('value')
>>
>>     ID.createOrReplaceTempView("transformed_Stream_DF")
>>
>>     df = spark.sql("select avg(col1) as aver from transformed_Stream_DF")
>>
>>     df.createOrReplaceTempView("abcd")
>>
>>     wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) 
>> col3 from transformed_Stream_DF")
>>
>>
>>     # -----------------------#
>>
>>     query1 = df \
>>         .writeStream \
>>         .format("console") \
>>         .outputMode("complete") \
>>         .trigger(processingTime='3 seconds') \
>>         .start()
>>
>>     query1.awaitTermination()
>>     # -----------------------#
>>
>>     query2 = wordCounts \
>>         .writeStream \
>>         .format("console") \
>>         .trigger(processingTime='3 seconds') \
>>         .start()
>>
>>     query2.awaitTermination()
>>
>>     # /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit 
>> --packages 
>> org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.databricks:spark-csv_2.10:1.0.3
>>  
>> /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py
>>
>>
>>
>>
>> Thanks,
>> Aakash.
>>
>>
>

Reply via email to