most probably we will require an  additional method pause()

https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.streaming.StreamingQuery.html

to allow us to pause (as opposed to stop()) the streaming process and
resume after changing the parameters. The state of streaming needs to be
preserved.

HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 7 Mar 2023 at 17:25, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> hm interesting proposition. I guess you mean altering one of following
> parameters in flight
>
>
>           streamingDataFrame = self.spark \
>                 .readStream \
>                 .format("kafka") \
>                 .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
>                 .option("schema.registry.url",
> config['MDVariables']['schemaRegistryURL']) \
>                 .option("group.id", config['common']['appName']) \
>                 .option("zookeeper.connection.timeout.ms",
> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>                 .option("rebalance.backoff.ms",
> config['MDVariables']['rebalanceBackoffMS']) \
>                 .option("zookeeper.session.timeout.ms",
> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>                 .option("auto.commit.interval.ms",
> config['MDVariables']['autoCommitIntervalMS']) \
>                 .option("subscribe", config['MDVariables']['topic']) \
>                 .option("failOnDataLoss", "false") \
>                 .option("includeHeaders", "true") \
>                 .option("startingOffsets", "latest") \
>                 .load() \
>                 .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value"))
>
> Ok, one secure way of doing it though shutting down the streaming process
> gracefully without loss of data that impacts consumers. The other method
> implies inflight changes as suggested by the topic with zeio interruptions.
> Interestingly one of our clients requested a similar solution. As solutions
> architect /engineering manager I should come back with few options. I am on
> the case so to speak. There is a considerable interest in Spark Structured
> Streaming across the board, especially in trading systems.
>
> HTH
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 16 Feb 2023 at 04:12, hueiyuan su <hueiyua...@gmail.com> wrote:
>
>> *Component*: Spark Structured Streaming
>> *Level*: Advanced
>> *Scenario*: How-to
>>
>> -------------------------
>> *Problems Description*
>> I would like to confirm could we directly apply new options of
>> readStream/writeStream without stopping current running spark structured
>> streaming applications? For example, if we just want to adjust throughput
>> properties of readStream with kafka. Do we have method can just adjust it
>> without stopping application? If you have any ideas, please let me know. I
>> will be appreciate it and your answer.
>>
>>
>> --
>> Best Regards,
>>
>> Mars Su
>> *Phone*: 0988-661-013
>> *Email*: hueiyua...@gmail.com
>>
>

Reply via email to