Hi,
Why are inner queries not allowed in Spark Streaming? Spark assumes the
inner query to be a separate stream altogether and expects it to be
triggered with a separate writeStream.start().
Why so?
Error: pyspark.sql.utils.StreamingQueryException: 'Queries with streaming
sources must be executed with writeStream.start();;\ntextSocket\n===
Streaming Query ===\nIdentifier: [id =
f77611ee-ce1c-4b16-8812-0f1afe05562c, runId =
0bb4d880-1a4d-4a6c-8fe0-2b4977ab52d0]\nCurrent Committed Offsets:
{}\nCurrent Available Offsets: {TextSocketSource[host: localhost, port:
9998]: 5}\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical
Plan:\nProject [col1#3, col2#4, (cast(col2#4 as double) / scalar-subquery#8
[]) AS col3#9]\n: +- Aggregate [avg(cast(col1#3 as double)) AS
aver#7]\n: +- SubqueryAlias ds\n: +- Project [split(value#1,
,)[0] AS col1#3, split(value#1, ,)[1] AS col2#4]\n: +-
StreamingRelation
DataSource(org.apache.spark.sql.SparkSession@3ac605bf,socket,List(),None,List(),None,Map(header
-> true, host -> localhost, path -> csv, port -> 9998),None), textSocket,
[value#1]\n+- SubqueryAlias ds\n +- Project [split(value#1, ,)[0] AS
col1#3, split(value#1, ,)[1] AS col2#4]\n +-
StreamingExecutionRelation TextSocketSource[host: localhost, port: 9998],
[value#1]\n'
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split
spark = SparkSession \
.builder \
.appName("StructuredRunningAvg") \
.getOrCreate()
data = spark \
.readStream \
.format("socket") \
.option("header","true") \
.option("host", "localhost") \
.option("port", 9998) \
.load("csv")
id = data.select(split(data.value, ",").getItem(0).alias("col1"),
split(data.value, ",").getItem(1).alias("col2"))
id.createOrReplaceTempView("ds")
final_DF = spark.sql("Select col1, col2, col2/(select avg(col1) as
aver from ds) col3 from ds")
query = final_DF \
.writeStream \
.format("console") \
.start()
query.awaitTermination()
Thanks,
Aakash.