Hi Panagiotis,
I did that, but it still prints the result of the first query and awaits
for new data, doesn't even goes to the next one.
*Data -*
$ nc -lk 9998
1,2
3,4
5,6
7,8
*Result -*
-------------------------------------------
Batch: 0
-------------------------------------------
+----+
|aver|
+----+
| 3.0|
+----+
-------------------------------------------
Batch: 1
-------------------------------------------
+----+
|aver|
+----+
| 4.0|
+----+
*Updated Code -*
from pyspark.sql import SparkSession
from pyspark.sql.functions import split
spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()
data = spark \
.readStream \
.format("socket") \
.option("header","true") \
.option("host", "localhost") \
.option("port", 9998) \
.load("csv")
id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"),
split(data.value, ",").getItem(1).alias("col2"))
id_DF.createOrReplaceTempView("ds")
df = spark.sql("select avg(col1) as aver from ds")
df.createOrReplaceTempView("abcd")
wordCounts = spark.sql("Select col1, col2, col2/(select aver from
abcd) col3 from ds") # (select aver from abcd)
query2 = df \
.writeStream \
.format("console") \
.outputMode("complete") \
.trigger(processingTime='5 seconds') \
.start()
query = wordCounts \
.writeStream \
.format("console") \
.trigger(processingTime='5 seconds') \
.start()
spark.streams.awaitAnyTermination()
Thanks,
Aakash.
On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <[email protected]>
wrote:
> Hello Aakash,
>
> When you use query.awaitTermination you are pretty much blocking there
> waiting for the current query to stop or throw an exception. In your case
> the second query will not even start.
> What you could do instead is remove all the blocking calls and use
> spark.streams.awaitAnyTermination instead (waiting for either query1 or
> query2 to terminate). Make sure you do that after the query2.start call.
>
> I hope this helps.
>
> Cheers,
> Panagiotis
>
> On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu <[email protected]>
> wrote:
>
>> Any help?
>>
>> Need urgent help. Someone please clarify the doubt?
>>
>> ---------- Forwarded message ----------
>> From: Aakash Basu <[email protected]>
>> Date: Thu, Apr 5, 2018 at 3:18 PM
>> Subject: [Structured Streaming] More than 1 streaming in a code
>> To: user <[email protected]>
>>
>>
>> Hi,
>>
>> If I have more than one writeStream in a code, which operates on the same
>> readStream data, why does it produce only the first writeStream? I want the
>> second one to be also printed on the console.
>>
>> How to do that?
>>
>> from pyspark.sql import SparkSession
>> from pyspark.sql.functions import split, col
>>
>> class test:
>>
>>
>> spark = SparkSession.builder \
>> .appName("Stream_Col_Oper_Spark") \
>> .getOrCreate()
>>
>> data = spark.readStream.format("kafka") \
>> .option("startingOffsets", "latest") \
>> .option("kafka.bootstrap.servers", "localhost:9092") \
>> .option("subscribe", "test1") \
>> .load()
>>
>> ID = data.select('value') \
>> .withColumn('value', data.value.cast("string")) \
>> .withColumn("Col1", split(col("value"), ",").getItem(0)) \
>> .withColumn("Col2", split(col("value"), ",").getItem(1)) \
>> .drop('value')
>>
>> ID.createOrReplaceTempView("transformed_Stream_DF")
>>
>> df = spark.sql("select avg(col1) as aver from transformed_Stream_DF")
>>
>> df.createOrReplaceTempView("abcd")
>>
>> wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd)
>> col3 from transformed_Stream_DF")
>>
>>
>> # -----------------------#
>>
>> query1 = df \
>> .writeStream \
>> .format("console") \
>> .outputMode("complete") \
>> .trigger(processingTime='3 seconds') \
>> .start()
>>
>> query1.awaitTermination()
>> # -----------------------#
>>
>> query2 = wordCounts \
>> .writeStream \
>> .format("console") \
>> .trigger(processingTime='3 seconds') \
>> .start()
>>
>> query2.awaitTermination()
>>
>> # /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit
>> --packages
>> org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.databricks:spark-csv_2.10:1.0.3
>>
>> /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py
>>
>>
>>
>>
>> Thanks,
>> Aakash.
>>
>>
>