Instead of write to console you need to write to memory for it to be queryable
.format("memory")
.queryName("tableName")
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks
________________________________
From: Aakash Basu <[email protected]>
Sent: Friday, April 6, 2018 3:22:07 AM
To: user
Subject: Fwd: [Structured Streaming Query] Calculate Running Avg from Kafka
feed using SQL query
Any help?
Need urgent help. Someone please clarify the doubt?
---------- Forwarded message ----------
From: Aakash Basu
<[email protected]<mailto:[email protected]>>
Date: Mon, Apr 2, 2018 at 1:01 PM
Subject: [Structured Streaming Query] Calculate Running Avg from Kafka feed
using SQL query
To: user <[email protected]<mailto:[email protected]>>, "Bowden, Chris"
<[email protected]<mailto:[email protected]>>
Hi,
This is a very interesting requirement, where I am getting stuck at a few
places.
Requirement -
Col1 Col2
1 10
2 11
3 12
4 13
5 14
I have to calculate avg of col1 and then divide each row of col2 by that avg.
And, the Avg should be updated with every new data being fed through Kafka into
Spark Streaming.
Avg(Col1) = Running Avg
Col2 = Col2/Avg(Col1)
Queries -
1) I am currently trying to simply run a inner query inside a query and print
Avg with other Col value and then later do the calculation. But, getting error.
Query -
select t.Col2 , (Select AVG(Col1) as Avg from transformed_Stream_DF) as myAvg
from transformed_Stream_DF t
Error -
pyspark.sql.utils.StreamingQueryException: u'Queries with streaming sources
must be executed with writeStream.start();
Even though, I already have writeStream.start(); in my code, it is probably
throwing the error because of the inner select query (I think Spark is assuming
it as another query altogether which require its own writeStream.start. Any
help?
2) How to go about it? I have another point in mind, i.e, querying the table to
get the avg and store it in a variable. In the second query simply pass the
variable and divide the second column to produce appropriate result. But, is it
the right approach?
3) Final question: How to do the calculation over the entire data and not the
latest, do I need to keep appending somewhere and repeatedly use it? My average
and all the rows of the Col2 shall change with every new incoming data.
Code -
from pyspark.sql import SparkSession
import time
from pyspark.sql.functions import split, col
class test:
spark = SparkSession.builder \
.appName("Stream_Col_Oper_Spark") \
.getOrCreate()
data = spark.readStream.format("kafka") \
.option("startingOffsets", "latest") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test1") \
.load()
ID = data.select('value') \
.withColumn('value', data.value.cast("string")) \
.withColumn("Col1", split(col("value"), ",").getItem(0)) \
.withColumn("Col2", split(col("value"), ",").getItem(1)) \
.drop('value')
ID.createOrReplaceTempView("transformed_Stream_DF")
aggregate_func = spark.sql(
"select t.Col2 , (Select AVG(Col1) as Avg from transformed_Stream_DF)
as myAvg from transformed_Stream_DF t") # (Col2/(AVG(Col1)) as Col3)")
# -----------For Console Print-----------
query = aggregate_func \
.writeStream \
.format("console") \
.start()
# .outputMode("complete") \
# -----------Console Print ends-----------
query.awaitTermination()
# /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0
/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py
Thanks,
Aakash.