Hi all,
The following is the updated code, where I'm getting the avg in a DF, but
the collect() function, to store the value as a variable and pass it to the
final select query is not working. So, avg is currently a dataframe and not
a variable with value stored in it.
New code -
from pyspark.sql import SparkSession
import time
from pyspark.sql.functions import split, col
class test:
spark = SparkSession.builder \
.appName("Stream_Col_Oper_Spark") \
.getOrCreate()
data = spark.readStream.format("kafka") \
.option("startingOffsets", "latest") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test1") \
.load()
ID = data.select('value') \
.withColumn('value', data.value.cast("string")) \
.withColumn("Col1", split(col("value"), ",").getItem(0)) \
.withColumn("Col2", split(col("value"), ",").getItem(1)) \
.drop('value')
ID.createOrReplaceTempView("transformed_Stream_DF")
avg = spark.sql("select AVG(Col1) as Avg from
transformed_Stream_DF") # .collect()[0][0]
aggregate_func = spark.sql(
"select Col1, Col2, Col2/{0} as Col3 from
transformed_Stream_DF".format(avg)) # (Col2/(AVG(Col1)) as Col3)")
# -----------For Console Print-----------
query1 = avg \
.writeStream \
.format("console") \
.outputMode("complete") \
.start()
query = aggregate_func \
.writeStream \
.format("console") \
.start()
# .outputMode("complete") \
# -----------Console Print ends-----------
query1.awaitTermination()
# /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0
/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py
If I uncomment the collect from the above code and use it, I get the
following error -
*pyspark.sql.utils.AnalysisException: u'Queries with streaming sources must
be executed with writeStream.start();;\nkafka'*
Any alternative (better) solution to get this job done, would suffice too.
Any help shall be greatly acknowledged.
Thanks,
Aakash.
On Mon, Apr 2, 2018 at 1:01 PM, Aakash Basu <[email protected]>
wrote:
> Hi,
>
> This is a very interesting requirement, where I am getting stuck at a few
> places.
>
> *Requirement* -
>
> Col1 Col2
> 1 10
> 2 11
> 3 12
> 4 13
> 5 14
>
>
>
> *I have to calculate avg of col1 and then divide each row of col2 by that
> avg. And, the Avg should be updated with every new data being fed through
> Kafka into Spark Streaming.*
>
> *Avg(Col1) = Running Avg*
> *Col2 = Col2/Avg(Col1)*
>
>
> *Queries* *-*
>
>
> *1) I am currently trying to simply run a inner query inside a query and
> print Avg with other Col value and then later do the calculation. But,
> getting error.*
>
> Query -
>
> select t.Col2 , (Select AVG(Col1) as Avg from transformed_Stream_DF) as myAvg
> from transformed_Stream_DF t
>
> Error -
>
> pyspark.sql.utils.StreamingQueryException: u'Queries with streaming
> sources must be executed with writeStream.start();
>
> Even though, I already have writeStream.start(); in my code, it is
> probably throwing the error because of the inner select query (I think
> Spark is assuming it as another query altogether which require its own
> writeStream.start. Any help?
>
>
> *2) How to go about it? *I have another point in mind, i.e, querying the
> table to get the avg and store it in a variable. In the second query simply
> pass the variable and divide the second column to produce appropriate
> result. But, is it the right approach?
>
> *3) Final question*: How to do the calculation over the entire data and
> not the latest, do I need to keep appending somewhere and repeatedly use
> it? My average and all the rows of the Col2 shall change with every new
> incoming data.
>
>
> *Code -*
>
> from pyspark.sql import SparkSession
> import time
> from pyspark.sql.functions import split, col
>
> class test:
>
>
> spark = SparkSession.builder \
> .appName("Stream_Col_Oper_Spark") \
> .getOrCreate()
>
> data = spark.readStream.format("kafka") \
> .option("startingOffsets", "latest") \
> .option("kafka.bootstrap.servers", "localhost:9092") \
> .option("subscribe", "test1") \
> .load()
>
> ID = data.select('value') \
> .withColumn('value', data.value.cast("string")) \
> .withColumn("Col1", split(col("value"), ",").getItem(0)) \
> .withColumn("Col2", split(col("value"), ",").getItem(1)) \
> .drop('value')
>
> ID.createOrReplaceTempView("transformed_Stream_DF")
> aggregate_func = spark.sql(
> "select t.Col2 , (Select AVG(Col1) as Avg from transformed_Stream_DF)
> as myAvg from transformed_Stream_DF t") # (Col2/(AVG(Col1)) as Col3)")
>
> # -----------For Console Print-----------
>
> query = aggregate_func \
> .writeStream \
> .format("console") \
> .start()
> # .outputMode("complete") \
> # -----------Console Print ends-----------
>
> query.awaitTermination()
> # /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit
> --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0
> /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py
>
>
>
>
> Thanks,
> Aakash.
>