I checked this process of gracefully terminating the topic when the flag is
set to terminate the topic. In this case the topic is called md => market
data. The first two batches and then you set the termination flag on

Topic market data => md, batchId is 236, at 2022-03-01 20:52:00.099259
+------------------------------------+------+-------------------+------+
|rowkey                              |ticker|timeissued         |price |
+------------------------------------+------+-------------------+------+
|edb5e84d-630d-43e4-9e83-9a8e3cd5adef|IBM   |2022-03-01 20:52:01|191.95|
|32351db1-b241-4e87-9d0b-48b81c0e2b85|MSFT  |2022-03-01 20:52:01|27.1  |
|43df704d-7b1d-4adb-8658-643d5a1fb877|BP    |2022-03-01 20:52:02|584.75|
|03e1cdbb-b65f-4306-8e0f-2c383e587e5b|SAP   |2022-03-01 20:52:02|34.46 |
|423b39f1-81f6-456d-b651-2b1bd928036e|MRW   |2022-03-01 20:52:01|235.9 |
|5bb795c8-bc55-4c7c-8ea4-5aac9e6225ef|SBRY  |2022-03-01 20:52:02|396.65|
|99b77350-f0f1-47b3-bb23-2679e6b3be9f|TSCO  |2022-03-01 20:52:02|207.04|
|e7be9fc2-4030-48aa-90db-3ffa68d70636|VOD   |2022-03-01 20:52:02|204.32|
|7e577a26-2ffe-4313-b8f1-17212a78bace|MKS   |2022-03-01 20:52:02|609.5 |
|f3548462-99ee-4551-a284-b6fcc0e1c43b|ORCL  |2022-03-01 20:52:01|32.14 |
+------------------------------------+------+-------------------+------+

Topic market data => md, batchId is 237, at 2022-03-01 20:52:30.080247
+------------------------------------+------+-------------------+------+
|rowkey                              |ticker|timeissued         |price |
+------------------------------------+------+-------------------+------+
|9289d0da-6b6c-48a3-9de8-ade18ecb62b1|VOD   |2022-03-01 20:52:34|238.85|
|cb990ffd-cbeb-4805-a451-bacde22a9a4a|ORCL  |2022-03-01 20:52:34|41.04 |
|7aa818f8-7b48-4941-a663-a881529605f6|BP    |2022-03-01 20:52:34|456.7 |
|03783b5e-f98e-453f-b8b1-9c88d9053bf8|IBM   |2022-03-01 20:52:33|181.07|
|85581bd3-04e8-4598-9ac2-10bb9af4ca4e|MSFT  |2022-03-01 20:52:34|28.14 |
|7113ce60-9721-499a-9d57-c8eea7619aae|TSCO  |2022-03-01 20:52:34|362.3 |
|e424782b-7767-4fa6-b090-d31d01010783|MKS   |2022-03-01 20:52:34|594.5 |
|c158aaa3-4d46-4f35-9927-3c3fe52e8025|MRW   |2022-03-01 20:52:34|273.47|
|d53933ca-07dc-4069-b303-e3ed95590484|SBRY  |2022-03-01 20:52:34|278.05|
|25fde8d8-7740-47b1-a3a3-d2e77aacbb1f|SAP   |2022-03-01 20:52:34|33.25 |
+------------------------------------+------+-------------------+------+

Request terminating streaming process for topic md at 2022-03-01
20:52:39.654067

When this streaming job is terminated orderly,  there is no corruption. The
logic is shown below:


def sendToControl(dfnewtopic, batchId):

    if(len(dfnewtopic.take(1))) > 0:

       queue = dfnewtopic.select(col("queue")).collect()[0][0]

        status = dfnewtopic.select(col("status")).collect()[0][0]

        if((queue == config['MDVariables']['topic']) & (status == 'false')):

          spark_session = s.spark_session(config['common']['appName'])

          active = spark_session.streams.active

          for e in active:

             name = e.name

             if(name == config['MDVariables']['topic']):

                print(f"""Request terminating streaming process for topic
{name} at {datetime.now()} """)

                e.stop()

    else:

        print("DataFrame newtopic is empty")


HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 26 Feb 2022 at 10:44, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Check the thread I forwarded on how to gracefully shutdown spark
> structured streaming
>
> HTH
>
> On Fri, 25 Feb 2022 at 22:31, karan alang <karan.al...@gmail.com> wrote:
>
>> Hello All,
>> I'm running a StructuredStreaming program on GCP Dataproc, which reads
>> data from Kafka, does some processing and puts processed data back into
>> Kafka. The program was running fine, when I killed it (to make minor
>> changes), and then re-started it.
>>
>> It is giving me the error - pyspark.sql.utils.StreamingQueryExceptionace:
>> batch 44 doesn't exist
>>
>> Here is the error:
>>
>> 22/02/25 22:14:08 ERROR 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution: Query [id = 
>> 0b73937f-1140-40fe-b201-cf4b7443b599, runId = 
>> 43c9242d-993a-4b9a-be2b-04d8c9e05b06] terminated with error
>> java.lang.IllegalStateException: batch 44 doesn't exist
>>     at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$populateStartOffsets$1(MicroBatchExecution.scala:286)
>>     at scala.Option.getOrElse(Option.scala:189)
>>     at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.populateStartOffsets(MicroBatchExecution.scala:286)
>>     at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:197)
>>     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>     at 
>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
>>     at 
>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
>>     at 
>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
>>     at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
>>     at 
>> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
>>     at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
>>     at 
>> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
>>     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>>     at 
>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
>>     at 
>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
>> Traceback (most recent call last):
>>   File 
>> "/tmp/0149aedd804c42f288718e013fb16f9c/StructuredStreaming_GCP_Versa_Sase_gcloud.py",
>>  line 609, in <module>
>>     query.awaitTermination()
>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", 
>> line 101, in awaitTermination
>>   File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", 
>> line 1304, in __call__
>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 
>> 117, in deco
>> pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist
>>
>>
>> Question - what is the cause of this error and how to debug/fix ? Also, I
>> notice that the checkpoint location gets corrupted occasionally, when I do
>> multiple restarts. After checkpoint corruption, it does not return any
>> records
>>
>> For the above issue(as well as when the checkpoint was corrupted), when i
>> cleared the checkpoint location and re-started the program, it went trhough
>> fine.
>>
>> Pls note: while doing readStream, i've enabled failOnDataLoss=false
>>
>> Additional details are in stackoverflow :
>>
>>
>> https://stackoverflow.com/questions/71272328/structuredstreaming-error-pyspark-sql-utils-streamingqueryexception-batch-44
>>
>> any input on this ?
>>
>> tia!
>>
>>
>> --
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>

Reply via email to