hm.

I am trying to recall if I am correct  so you should try
outpudeMode('complete') with format('console')

            result = resultMF. \
                     writeStream. \
                     outputMode('complete'). \
                     option("numRows", 1000). \
                     option("truncate", "false"). \
                     format('console'). \
                     option('checkpointLocation', checkpoint_path). \
                     queryName("temperature"). \
                     start()

On another example I have

           result = streamingDataFrame.select( \
                     col("parsed_value.rowkey").alias("rowkey") \
                   , col("parsed_value.timeissued").alias("timeissued") \
                   , col("parsed_value.temperature").alias("temperature")).
\
                     writeStream. \
                     outputMode('append'). \
                     option("truncate", "false"). \
                     foreachBatch(temperatures). \
                     trigger(processingTime='60 seconds'). \
                     option('checkpointLocation', checkpoint_path). \
                     queryName("temperature"). \
                     start()

def temperatures(df, batchId):
    if(len(df.take(1))) > 0:
        df.show(100,False)
        df. persist()
        AvgTemp =
df.select(round(F.avg(col("temperature")))).collect()[0][0]
        df.unpersist()
        now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
        print(f"""Average temperature at {now} from batchId {batchId} is
{AvgTemp} degrees""")
    else:
        print("DataFrame s empty")

HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 1 Feb 2022 at 23:45, karan alang <karan.al...@gmail.com> wrote:

> Hello Spark Experts,
>
> I've a simple Structured Streaming program, which reads data from Kafka,
> and writes on the console. This is working in batch mode (i.e spark.read or
> df.write), not not working in streaming mode.
>
> Details are in the stackoverflow
>
>
> https://stackoverflow.com/questions/70948967/structured-streaming-not-writing-records-to-console-when-using-writestream-ba
>
> Any inputs on how to fix/debug this ?
> tia !
>

Reply via email to