Re: [Spark Structured Streaming] Could we apply new options of readStream/writeStream without stopping spark application (zero downtime)?

2023-03-09 Thread hueiyuan su
Dear Mich,

Sure, that is a good idea. If we have a pause() function, we can
temporarily stop streaming and adjust configuration, maybe from environment
variable.
Once these parameters are adjust, we can restart the streaming to apply the
newest parameter without stop spark streaming application.

Mich Talebzadeh  於 2023年3月10日 週五 上午12:26寫道:

> most probably we will require an  additional method pause()
>
>
> https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.streaming.StreamingQuery.html
>
> to allow us to pause (as opposed to stop()) the streaming process and
> resume after changing the parameters. The state of streaming needs to be
> preserved.
>
> HTH
>
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 7 Mar 2023 at 17:25, Mich Talebzadeh 
> wrote:
>
>> hm interesting proposition. I guess you mean altering one of following
>> parameters in flight
>>
>>
>>   streamingDataFrame = self.spark \
>> .readStream \
>> .format("kafka") \
>> .option("kafka.bootstrap.servers",
>> config['MDVariables']['bootstrapServers'],) \
>> .option("schema.registry.url",
>> config['MDVariables']['schemaRegistryURL']) \
>> .option("group.id", config['common']['appName']) \
>> .option("zookeeper.connection.timeout.ms",
>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>> .option("rebalance.backoff.ms",
>> config['MDVariables']['rebalanceBackoffMS']) \
>> .option("zookeeper.session.timeout.ms",
>> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>> .option("auto.commit.interval.ms",
>> config['MDVariables']['autoCommitIntervalMS']) \
>> .option("subscribe", config['MDVariables']['topic']) \
>> .option("failOnDataLoss", "false") \
>> .option("includeHeaders", "true") \
>> .option("startingOffsets", "latest") \
>> .load() \
>> .select(from_json(col("value").cast("string"),
>> schema).alias("parsed_value"))
>>
>> Ok, one secure way of doing it though shutting down the streaming process
>> gracefully without loss of data that impacts consumers. The other method
>> implies inflight changes as suggested by the topic with zeio interruptions.
>> Interestingly one of our clients requested a similar solution. As solutions
>> architect /engineering manager I should come back with few options. I am on
>> the case so to speak. There is a considerable interest in Spark Structured
>> Streaming across the board, especially in trading systems.
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 16 Feb 2023 at 04:12, hueiyuan su  wrote:
>>
>>> *Component*: Spark Structured Streaming
>>> *Level*: Advanced
>>> *Scenario*: How-to
>>>
>>> -
>>> *Problems Description*
>>> I would like to confirm could we directly apply new options of
>>> readStream/writeStream without stopping current running spark structured
>>> streaming applications? For example, if we just want to adjust throughput
>>> properties of readStream with kafka. Do we have method can just adjust it
>>> without stopping application? If you have any ideas, please let me know. I
>>> will be appreciate it and your answer.
>>>
>>>
>>> --
>>> Best Regards,
>>>
>>> Mars Su
>>> *Phone*: 0988-661-013
>>> *Email*: hueiyua...@gmail.com
>>>
>>

-- 
Best Regards,

Mars Su
*Phone*: 0988-661-013
*Email*: hueiyua...@gmail.com


[Spark] How to find which type of key is illegal during from_json() function

2023-03-08 Thread hueiyuan su
*Component*: Spark
*Level*: Advanced
*Scenario*: How-to

-
*Problems Description*
I have nested json string value in someone field of spark dataframe, and I
would like to use from_json() to parse json object. Especially, if one of
key type is not match with our defined struct type, it will return null.
Based on this, could we find which key type is error? Related example
follow as:

*source dataframe:*
| original_json_string |
| -- |
| "{a:{b:"dn", c:"test"}}" |

ps. And we expected the value type of b should be double type. so we
predefined struct type for from_json() to use, but just directly return
null:

*result dataframe after from_json:*
| original_json_string |
| -- |
| null |

In this sample, because value of a have 2 keys, b and c, could we know is
value type of b is error instead of c, which can let me check data quickly
instead just return null.
If we would like to achieve this objective, how to implement it?
if you have and ideas, I will be appreciated it, thank you.

-- 
Best Regards,

Mars Su
*Phone*: 0988-661-013
*Email*: hueiyua...@gmail.com


[Spark Structured Streaming] Do spark structured streaming is support sink to AWS Kinesis currently and how to handle if achieve quotas of kinesis?

2023-03-05 Thread hueiyuan su
*Component*: Spark Structured Streaming
*Level*: Advanced
*Scenario*: How-to


*Problems Description*
1. I currently would like to use pyspark structured streaming to write data
to kinesis. But it seems like does not have corresponding connector can
use. I would confirm whether have another method in addition to this
solution

2. Because aws kinesis have quota limitation (like 1MB/s and 1000
records/s), if spark structured streaming micro batch size too large, how
can we handle this?

-- 
Best Regards,

Mars Su
*Phone*: 0988-661-013
*Email*: hueiyua...@gmail.com


[Spark Structured Streaming] Do spark structured streaming is support sink to AWS Kinesis currently?

2023-02-16 Thread hueiyuan su
*Component*: Spark Structured Streaming
*Level*: Advanced
*Scenario*: How-to


*Problems Description*
I would like to implement witeStream data to AWS Kinesis with Spark
structured Streaming, but I do not find related connector jar can be used.
I want to check whether fully support write stream to AWS Kinesis. If you
have any ideas, please let me know. I will be appreciate it for your answer.

-- 
Best Regards,

Mars Su
*Phone*: 0988-661-013
*Email*: hueiyua...@gmail.com


[Spark Structured Streaming] Could we apply new options of readStream/writeStream without stopping spark application (zero downtime)?

2023-02-15 Thread hueiyuan su
*Component*: Spark Structured Streaming
*Level*: Advanced
*Scenario*: How-to

-
*Problems Description*
I would like to confirm could we directly apply new options of
readStream/writeStream without stopping current running spark structured
streaming applications? For example, if we just want to adjust throughput
properties of readStream with kafka. Do we have method can just adjust it
without stopping application? If you have any ideas, please let me know. I
will be appreciate it and your answer.


-- 
Best Regards,

Mars Su
*Phone*: 0988-661-013
*Email*: hueiyua...@gmail.com