Re: Checkpointing in Spark Structured Streaming

2021-03-22 Thread Jungtaek Lim
One more thing I missed, commit metadata for the batch N must be written
"after" all other parts of the checkpoint are successfully written for the
batch N.

So you seem to find a way to do asynchronous commit on "custom state store
provider" - as I commented before, it's being tied to the task lifecycle
which says you're no longer be able to fail the task once you make the
state store commit be async. There's a rough idea to do it, but would
require Spark code change on checkpoint commit phase - driver needs to
check the status of commits on state store for all stateful partitions, and
commits only when all commits for state store are successful. In detail, it
could be somewhat complicated on implementation.

On Tue, Mar 23, 2021 at 5:56 AM Rohit Agrawal  wrote:

> Thank you for the reply. For our use case, it's okay to not have
> exactly-once semantics. Given this use case of not needing exactly-once
> a) Is there any negative implications if one were to use a custom state
> store provider which asynchronously committed under the hood
> b) Is there any other option to achieve this without using a custom state
> store provider ?
>
> Rohit
>
> On Mon, Mar 22, 2021 at 4:09 PM Jungtaek Lim 
> wrote:
>
>> I see some points making async checkpoint be tricky to add in
>> micro-batch; one example is "end to end exactly-once", as the commit phase
>> in sink for the batch N can be run "after" the batch N + 1 has been started
>> and write for batch N + 1 can happen before committing batch N. state store
>> checkpoint is tied to task lifecycle instead of checkpoint phase, which is
>> also tricky to make it be async.
>>
>> There may be still some spots to optimize on checkpointing though, one
>> example is SPARK-34383 [1]. I've figured out it helps to reduce latency on
>> checkpointing with object stores by 300+ ms per batch.
>>
>> Btw, even though S3 is now strongly consistent, it doesn't mean it's HDFS
>> compatible as default implementation of SS checkpoint requires. Atomic
>> rename is not supported, as well as rename isn't just a change on metadata
>> (read from S3 and write to S3 again). Performance would be sub-optimal, and
>> Spark no longer be able to prevent concurrent streaming queries trying to
>> update to the same checkpoint which might possibly mess up the checkpoint.
>> You need to make sure there's only one streaming query running against a
>> specific checkpoint.
>>
>> 1. https://issues.apache.org/jira/browse/SPARK-34383
>>
>> On Tue, Mar 23, 2021 at 1:55 AM Rohit Agrawal  wrote:
>>
>>> Hi,
>>>
>>> I have been experimenting with the Continuous mode and the Micro batch
>>> mode in Spark Structured Streaming. When enabling checkpoint to S3 instead
>>> of the local File System we see that Continuous mode has no change in
>>> latency (expected due to async checkpointing) however the Micro-batch mode
>>> experiences degradation likely due to sync checkpointing.
>>>
>>> Is there any way to get async checkpointing in the micro-batching mode
>>> as well to improve latency. Could that be done with custom checkpointing
>>> logic ? Any pointers / experiences in that direction would be helpful.
>>>
>>


Re: Checkpointing in Spark Structured Streaming

2021-03-22 Thread Rohit Agrawal
Thank you for the reply. For our use case, it's okay to not have
exactly-once semantics. Given this use case of not needing exactly-once
a) Is there any negative implications if one were to use a custom state
store provider which asynchronously committed under the hood
b) Is there any other option to achieve this without using a custom state
store provider ?

Rohit

On Mon, Mar 22, 2021 at 4:09 PM Jungtaek Lim 
wrote:

> I see some points making async checkpoint be tricky to add in micro-batch;
> one example is "end to end exactly-once", as the commit phase in sink for
> the batch N can be run "after" the batch N + 1 has been started and write
> for batch N + 1 can happen before committing batch N. state store
> checkpoint is tied to task lifecycle instead of checkpoint phase, which is
> also tricky to make it be async.
>
> There may be still some spots to optimize on checkpointing though, one
> example is SPARK-34383 [1]. I've figured out it helps to reduce latency on
> checkpointing with object stores by 300+ ms per batch.
>
> Btw, even though S3 is now strongly consistent, it doesn't mean it's HDFS
> compatible as default implementation of SS checkpoint requires. Atomic
> rename is not supported, as well as rename isn't just a change on metadata
> (read from S3 and write to S3 again). Performance would be sub-optimal, and
> Spark no longer be able to prevent concurrent streaming queries trying to
> update to the same checkpoint which might possibly mess up the checkpoint.
> You need to make sure there's only one streaming query running against a
> specific checkpoint.
>
> 1. https://issues.apache.org/jira/browse/SPARK-34383
>
> On Tue, Mar 23, 2021 at 1:55 AM Rohit Agrawal  wrote:
>
>> Hi,
>>
>> I have been experimenting with the Continuous mode and the Micro batch
>> mode in Spark Structured Streaming. When enabling checkpoint to S3 instead
>> of the local File System we see that Continuous mode has no change in
>> latency (expected due to async checkpointing) however the Micro-batch mode
>> experiences degradation likely due to sync checkpointing.
>>
>> Is there any way to get async checkpointing in the micro-batching mode as
>> well to improve latency. Could that be done with custom checkpointing logic
>> ? Any pointers / experiences in that direction would be helpful.
>>
>


Re: Checkpointing in Spark Structured Streaming

2021-03-22 Thread Jungtaek Lim
I see some points making async checkpoint be tricky to add in micro-batch;
one example is "end to end exactly-once", as the commit phase in sink for
the batch N can be run "after" the batch N + 1 has been started and write
for batch N + 1 can happen before committing batch N. state store
checkpoint is tied to task lifecycle instead of checkpoint phase, which is
also tricky to make it be async.

There may be still some spots to optimize on checkpointing though, one
example is SPARK-34383 [1]. I've figured out it helps to reduce latency on
checkpointing with object stores by 300+ ms per batch.

Btw, even though S3 is now strongly consistent, it doesn't mean it's HDFS
compatible as default implementation of SS checkpoint requires. Atomic
rename is not supported, as well as rename isn't just a change on metadata
(read from S3 and write to S3 again). Performance would be sub-optimal, and
Spark no longer be able to prevent concurrent streaming queries trying to
update to the same checkpoint which might possibly mess up the checkpoint.
You need to make sure there's only one streaming query running against a
specific checkpoint.

1. https://issues.apache.org/jira/browse/SPARK-34383

On Tue, Mar 23, 2021 at 1:55 AM Rohit Agrawal  wrote:

> Hi,
>
> I have been experimenting with the Continuous mode and the Micro batch
> mode in Spark Structured Streaming. When enabling checkpoint to S3 instead
> of the local File System we see that Continuous mode has no change in
> latency (expected due to async checkpointing) however the Micro-batch mode
> experiences degradation likely due to sync checkpointing.
>
> Is there any way to get async checkpointing in the micro-batching mode as
> well to improve latency. Could that be done with custom checkpointing logic
> ? Any pointers / experiences in that direction would be helpful.
>


Checkpointing in Spark Structured Streaming

2021-03-22 Thread Rohit Agrawal
Hi,

I have been experimenting with the Continuous mode and the Micro batch mode
in Spark Structured Streaming. When enabling checkpoint to S3 instead of
the local File System we see that Continuous mode has no change in latency
(expected due to async checkpointing) however the Micro-batch mode
experiences degradation likely due to sync checkpointing.

Is there any way to get async checkpointing in the micro-batching mode as
well to improve latency. Could that be done with custom checkpointing logic
? Any pointers / experiences in that direction would be helpful.


Re: K8s Integration test is unable to run because of the unavailable libs

2021-03-22 Thread Yikun Jiang
hey, Yi Wu

Looks like it's just an apt installation problem, we should do apt update
to refresh the local package cache list before we install the "gnupg".

I opened a issue on jira [1] , and try to fix it in [2], hope this helps.

[1] https://issues.apache.org/jira/browse/SPARK-34820
[2] https://github.com/apache/spark/pull/31923

Regards,
Yikun


Yi Wu  于2021年3月22日周一 下午2:15写道:

> Hi devs,
>
> It seems like the K8s Integration test is unable to run recently because
> of the unavailable libs:
>
> Err:20 http://security.debian.org/debian-security buster/updates/main amd64 
> libldap-common all 2.4.47+dfsg-3+deb10u4
>   404  Not Found [IP: 151.101.194.132 80]
> Err:21 http://security.debian.org/debian-security buster/updates/main amd64 
> libldap-2.4-2 amd64 2.4.47+dfsg-3+deb10u4
>   404  Not Found [IP: 151.101.194.132 80]
> E: Failed to fetch 
> http://security.debian.org/debian-security/pool/updates/main/o/openldap/libldap-common_2.4.47+dfsg-3+deb10u4_all.deb
>   404  Not Found [IP: 151.101.194.132 80]
> E: Failed to fetch 
> http://security.debian.org/debian-security/pool/updates/main/o/openldap/libldap-2.4-2_2.4.47+dfsg-3+deb10u4_amd64.deb
>   404  Not Found [IP: 151.101.194.132 80]
>
>
> I alreay saw the error is many places, e.g.,
>
>
> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40840/console
>
>
> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40837/console
>
>
> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40715/console
>
>
> Could someone familiar with K8s please take a look?
>
>
> Thanks,
>
> Yi
>
>
>


K8s Integration test is unable to run because of the unavailable libs

2021-03-22 Thread Yi Wu
Hi devs,

It seems like the K8s Integration test is unable to run recently because of
the unavailable libs:

Err:20 http://security.debian.org/debian-security buster/updates/main
amd64 libldap-common all 2.4.47+dfsg-3+deb10u4
  404  Not Found [IP: 151.101.194.132 80]
Err:21 http://security.debian.org/debian-security buster/updates/main
amd64 libldap-2.4-2 amd64 2.4.47+dfsg-3+deb10u4
  404  Not Found [IP: 151.101.194.132 80]
E: Failed to fetch
http://security.debian.org/debian-security/pool/updates/main/o/openldap/libldap-common_2.4.47+dfsg-3+deb10u4_all.deb
 404  Not Found [IP: 151.101.194.132 80]
E: Failed to fetch
http://security.debian.org/debian-security/pool/updates/main/o/openldap/libldap-2.4-2_2.4.47+dfsg-3+deb10u4_amd64.deb
 404  Not Found [IP: 151.101.194.132 80]


I alreay saw the error is many places, e.g.,


https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40840/console


https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40837/console


https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40715/console


Could someone familiar with K8s please take a look?


Thanks,

Yi