[jira] [Commented] (FLINK-35232) Support for retry settings on GCS connector

2024-04-29 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841999#comment-17841999
 ] 

Galen Warren commented on FLINK-35232:
--

[~xtsong] You helped with the original implementation of the GCS 
RecoverableWriter implementation. This proposal – to allow people to adjust the 
retry settings – makes sense to me, but I don't really know what guidance to 
give the people proposing it as far as potential next steps might be. How would 
something like this potentially move forward? Thanks.

> Support for retry settings on GCS connector
> ---
>
> Key: FLINK-35232
> URL: https://issues.apache.org/jira/browse/FLINK-35232
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.3, 1.16.2, 1.17.1, 1.19.0, 1.18.1
>Reporter: Vikas M
>Assignee: Ravi Singh
>Priority: Major
>
> https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to 
> specify transport options in GCS connector. While setting the params enabled 
> here reduced read timeouts, we still see 503 errors leading to Flink job 
> restarts.
> Thus, in this ticket, we want to specify additional retry settings as noted 
> in 
> [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries]
> We need 
> [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods]
>  methods available for Flink users so that they can customize their 
> deployment. In particular next settings seems to be the minimum required to 
> adjust GCS timeout with Job's checkpoint config:
>  * 
> [maxAttempts|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxAttempts__]
>  * 
> [initialRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getInitialRpcTimeout__]
>  * 
> [rpcTimeoutMultiplier|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getRpcTimeoutMultiplier__]
>  * 
> [maxRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxRpcTimeout__]
>  * 
> [totalTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getTotalTimeout__]
>  
> Basically the proposal is to be able to tune the timeout via multiplier, 
> maxAttemts + totalTimeout mechanisms.
> All of the config options should be optional and the default one should be 
> used in case some of configs are not provided.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35232) Support for retry settings on GCS connector

2024-04-25 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840942#comment-17840942
 ] 

Galen Warren commented on FLINK-35232:
--

Yes, that's using the RecoverableWriter.

The default retry settings are here: [Retry strategy  |  Cloud Storage  |  
Google Cloud|https://cloud.google.com/storage/docs/retry-strategy#java], so it 
should be retrying 6 times for up to 50 seconds I believe, as is. Which 
settings would you be looking to implement? And you're sure nothing else is 
going on that's causing the failures (network connectivity, etc.)?

Full disclosure, I worked on RecoverableWriter but I'm not a committer, so I 
can't really ok anything on my own. But being able to adjust these settings 
seems reasonable to me.

> Support for retry settings on GCS connector
> ---
>
> Key: FLINK-35232
> URL: https://issues.apache.org/jira/browse/FLINK-35232
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.3, 1.16.2, 1.17.1, 1.19.0, 1.18.1
>Reporter: Vikas M
>Assignee: Ravi Singh
>Priority: Major
>
> https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to 
> specify transport options in GCS connector. While setting the params enabled 
> here reduced read timeouts, we still see 503 errors leading to Flink job 
> restarts.
> Thus, in this ticket, we want to specify additional retry settings as noted 
> in 
> [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries]
> We want 
> [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods]
>  methods available for Flink users so that they can customize their 
> deployment.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35232) Support for retry settings on GCS connector

2024-04-25 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840862#comment-17840862
 ] 

Galen Warren commented on FLINK-35232:
--

In case it's relevant, reading/writing to GCS happens in two different ways in 
Flink.

First, the basic connector wraps a Google-provided Hadoop connector and 
leverages Flink's support for reading/writing to Hadoop file systems.

Second, the RecoverableWriter support leverages the Google Java library 
directly – the extra features associated with RecoverableWriter require this.

Just mentioning this, because the linked issue says that Hadoop libraries 
aren't used, which isn't always the case. I'm not sure which mode you're using 
the connector, from your description.

> Support for retry settings on GCS connector
> ---
>
> Key: FLINK-35232
> URL: https://issues.apache.org/jira/browse/FLINK-35232
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.3, 1.16.2, 1.17.1, 1.19.0, 1.18.1
>Reporter: Vikas M
>Assignee: Ravi Singh
>Priority: Major
>
> https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to 
> specify transport options in GCS connector. While setting the params enabled 
> here reduced read timeouts, we still see 503 errors leading to Flink job 
> restarts.
> Thus, in this ticket, we want to specify additional retry settings as noted 
> in 
> [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries]
> We want 
> [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods]
>  methods available for Flink users so that they can customize their 
> deployment.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34696) GSRecoverableWriterCommitter is generating excessive data blobs

2024-03-26 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830956#comment-17830956
 ] 

Galen Warren commented on FLINK-34696:
--

I'm not a committer so I can't approve anything on my own, but if we nail down 
the proposal I can loop in the person who helped me with the original 
implementation to see how to proceed.

To make sure we're on the same page, are we talking about implementing the more 
efficient composing algorithm (NlogN vs N^2)? I would suggest making it an 
opt-in to the new algorithm.

> GSRecoverableWriterCommitter is generating excessive data blobs
> ---
>
> Key: FLINK-34696
> URL: https://issues.apache.org/jira/browse/FLINK-34696
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Simon-Shlomo Poil
>Priority: Major
>
> The `composeBlobs` method in 
> `org.apache.flink.fs.gs.writer.GSRecoverableWriterCommitter` is designed to 
> merge multiple small blobs into a single large blob using Google Cloud 
> Storage's compose method. This process is iterative, combining the result 
> from the previous iteration with 31 new blobs until all blobs are merged. 
> Upon completion of the composition, the method proceeds to remove the 
> temporary blobs.
> *Issue:*
> This methodology results in significant, unnecessary data storage consumption 
> during the blob composition process, incurring considerable costs due to 
> Google Cloud Storage pricing models.
> *Example to Illustrate the Problem:*
>  - Initial state: 64 blobs, each 1 GB in size (totaling 64 GB).
>  - After 1st step: 32 blobs are merged into a single blob, increasing total 
> storage to 96 GB (64 original + 32 GB new).
>  - After 2nd step: The newly created 32 GB blob is merged with 31 more blobs, 
> raising the total to 159 GB.
>  - After 3rd step: The final blob is merged, culminating in a total of 223 GB 
> to combine the original 64 GB of data. This results in an overhead of 159 GB.
> *Impact:*
> This inefficiency has a profound impact, especially at scale, where terabytes 
> of data can incur overheads in the petabyte range, leading to unexpectedly 
> high costs. Additionally, we have observed an increase in storage exceptions 
> thrown by the Google Storage library, potentially linked to this issue.
> *Suggested Solution:*
> To mitigate this problem, we propose modifying the `composeBlobs` method to 
> immediately delete source blobs once they have been successfully combined. 
> This change could significantly reduce data duplication and associated costs. 
> However, the implications for data recovery and integrity need careful 
> consideration to ensure that this optimization does not compromise the 
> ability to recover data in case of a failure during the composition process.
> *Steps to Reproduce:*
> 1. Initiate the blob composition process in an environment with a significant 
> number of blobs (e.g., 64 blobs of 1 GB each).
> 2. Observe the temporary increase in data storage as blobs are iteratively 
> combined.
> 3. Note the final amount of data storage used compared to the initial total 
> size of the blobs.
> *Expected Behavior:*
> The blob composition process should minimize unnecessary data storage use, 
> efficiently managing resources to combine blobs without generating excessive 
> temporary data overhead.
> *Actual Behavior:*
> The current implementation results in significant temporary increases in data 
> storage, leading to high costs and potential system instability due to 
> frequent storage exceptions.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34696) GSRecoverableWriterCommitter is generating excessive data blobs

2024-03-20 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17829246#comment-17829246
 ] 

Galen Warren commented on FLINK-34696:
--

I'm not aware of any special considerations in RecoverableWriter for batch 
mode, but to be honest I'm not very knowledgeable about batch mode in general. 
Maybe someone else knows more about that ...

 
{quote}The code above also avoids inserting the intermediate compose blob 
identifier at index 0. That looks expensive if the list is very big (and as far 
as I see not a LinkedList)
{quote}
 

I don't think it's inserting anything, really, as the blobs are immutable. I 
think that just controls the order of the bytes in the newly created blob.

> GSRecoverableWriterCommitter is generating excessive data blobs
> ---
>
> Key: FLINK-34696
> URL: https://issues.apache.org/jira/browse/FLINK-34696
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Simon-Shlomo Poil
>Priority: Major
>
> The `composeBlobs` method in 
> `org.apache.flink.fs.gs.writer.GSRecoverableWriterCommitter` is designed to 
> merge multiple small blobs into a single large blob using Google Cloud 
> Storage's compose method. This process is iterative, combining the result 
> from the previous iteration with 31 new blobs until all blobs are merged. 
> Upon completion of the composition, the method proceeds to remove the 
> temporary blobs.
> *Issue:*
> This methodology results in significant, unnecessary data storage consumption 
> during the blob composition process, incurring considerable costs due to 
> Google Cloud Storage pricing models.
> *Example to Illustrate the Problem:*
>  - Initial state: 64 blobs, each 1 GB in size (totaling 64 GB).
>  - After 1st step: 32 blobs are merged into a single blob, increasing total 
> storage to 96 GB (64 original + 32 GB new).
>  - After 2nd step: The newly created 32 GB blob is merged with 31 more blobs, 
> raising the total to 159 GB.
>  - After 3rd step: The final blob is merged, culminating in a total of 223 GB 
> to combine the original 64 GB of data. This results in an overhead of 159 GB.
> *Impact:*
> This inefficiency has a profound impact, especially at scale, where terabytes 
> of data can incur overheads in the petabyte range, leading to unexpectedly 
> high costs. Additionally, we have observed an increase in storage exceptions 
> thrown by the Google Storage library, potentially linked to this issue.
> *Suggested Solution:*
> To mitigate this problem, we propose modifying the `composeBlobs` method to 
> immediately delete source blobs once they have been successfully combined. 
> This change could significantly reduce data duplication and associated costs. 
> However, the implications for data recovery and integrity need careful 
> consideration to ensure that this optimization does not compromise the 
> ability to recover data in case of a failure during the composition process.
> *Steps to Reproduce:*
> 1. Initiate the blob composition process in an environment with a significant 
> number of blobs (e.g., 64 blobs of 1 GB each).
> 2. Observe the temporary increase in data storage as blobs are iteratively 
> combined.
> 3. Note the final amount of data storage used compared to the initial total 
> size of the blobs.
> *Expected Behavior:*
> The blob composition process should minimize unnecessary data storage use, 
> efficiently managing resources to combine blobs without generating excessive 
> temporary data overhead.
> *Actual Behavior:*
> The current implementation results in significant temporary increases in data 
> storage, leading to high costs and potential system instability due to 
> frequent storage exceptions.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34696) GSRecoverableWriterCommitter is generating excessive data blobs

2024-03-19 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828412#comment-17828412
 ] 

Galen Warren edited comment on FLINK-34696 at 3/19/24 5:07 PM:
---

I think your proposed algorithm is just another way of implementing the "delete 
intermediate blobs that are no longer necessary as soon as possible" idea we've 
considered. You accomplish it by overwriting the staging blob on each 
iteration; a similar effect could be achieved by writing to a new intermediate 
staging blob on each iteration (as is done now) and deleting the old one right 
away (which is not done now. instead this deletion occurs shortly thereafter, 
after the commit succeeds).

Aside: I'm not sure whether it's possible to overwrite the existing staging 
blob like you suggest. The [docs |[Objects: compose  |  Cloud Storage  |  
Google 
Cloud|https://cloud.google.com/storage/docs/json_api/v1/objects/compose]] for 
the compose operation say:
{quote}Concatenates a list of existing objects into a new object in the same 
bucket. The existing source objects are unaffected by this operation
{quote}
In your proposal, the same staging blob is both the target of the compose 
operation and one of the input blobs to be composed. That _might_ work, but 
would have to be tested to see if it's allowed. If it isn't, writing to a new 
blob and deleting the old one would have essentially the same effect. That's 
almost certainly what happens behind the scenes anyway, since blobs are 
immutable.

Bigger picture, if you're trying to combine millions of immutable blobs 
together in one step, 32 at a time, I don't see how you avoid having lots of 
intermediate composed blobs, one way or another, at least temporarily. The main 
question would be how quickly ones that are no longer needed are discarded.
{quote}To streamline this, we could modify {{GSCommitRecoverable}} to update 
the list of {{{}componentObjectIds{}}}, allowing the removal of blobs already 
appended to the {{stagingBlob}}
{quote}
Not quite following you here; the GSCommitRecoverable is provided as an input 
to the GSRecoverableWriterCommitter, providing the list of "raw" (i.e. 
uncomposed) temporary blobs that need to be composed and committed. If you 
removed those raw blobs from that list as they're added to the staging blob, 
what would that accomplish? The GSCommitRecoverable provided to the 
GSRecoverableWriterCommitter isn't persisted anywhere after the commit succeeds 
or fails, and if the commit were to be retried it would need the complete list 
anyway.
{quote}I notice an other issue with the code - because the storage.compose 
might throw a StorageException. With the current code this would mean the 
intermediate composition blobs are not cleaned up. 

If the code above is implemented, I think there is no longer a need the TTL 
feature since all necessary blobs should be written to a final blob. Any 
leftover blobs post-job completion would indicate a failed state.
{quote}
There is no real way to prevent the orphaning of intermediate blobs in all 
failure scenarios. Even in your proposed algorithm, the staging blob could be 
orphaned if the composition process failed partway through. This is what the 
temp bucket and TTL mechanism are for. It's optional, so you don't have to use 
it, but yes you would have to keep an eye out for orphaned intermediate blobs 
via some other mechanism f you choose not to use it.

Honestly, I think some of your difficulties are coming from trying to combine 
so much data together at once vs. doing it along the way, with commits at 
incremental checkpoints. I was going to suggest you reduce your checkpoint 
interval, to aggregate more frequently, but obviously that doesn't work if 
you're not checkpointing at all.

I do think the RecoverableWriter mechanism is designed to make writing 
predictable and repeatable in checkpoint/recovery situations (hence the name); 
if you're not doing any of that, you may run into some challenges. If you were 
to use checkpoints, you would aggregate more frequently, meaning fewer 
intermediate blobs with shorter lifetimes, and you'd be less likely to run up 
against the 5TB limit, as you would end up with a series of smaller files, 
written at each checkpoint, vs. one huge file.

If we do want to consider changes, here's what I suggest:
 * Test whether GCP allows composition to overwrite an existing blob that is 
also an input to the compose operation. If that works, then we could use that 
technique in the composeBlobs function. As mentioned above, I doubt that this 
actually reduces the number of blobs that exist temporarily – since blobs are 
immutable objects – but it would minimize their lifetime, effectively deleting 
them as soon as possible. If overwriting doesn't work, then we could achieve a 
similar effect by deleting intermediate blobs as soon as they are not needed 
anymore, 

[jira] [Comment Edited] (FLINK-34696) GSRecoverableWriterCommitter is generating excessive data blobs

2024-03-19 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828412#comment-17828412
 ] 

Galen Warren edited comment on FLINK-34696 at 3/19/24 5:07 PM:
---

I think your proposed algorithm is just another way of implementing the "delete 
intermediate blobs that are no longer necessary as soon as possible" idea we've 
considered. You accomplish it by overwriting the staging blob on each 
iteration; a similar effect could be achieved by writing to a new intermediate 
staging blob on each iteration (as is done now) and deleting the old one right 
away (which is not done now. instead this deletion occurs shortly thereafter, 
after the commit succeeds).

Aside: I'm not sure whether it's possible to overwrite the existing staging 
blob like you suggest. The [docs |[Objects: compose  |  Cloud Storage  |  
Google 
Cloud|https://cloud.google.com/storage/docs/json_api/v1/objects/compose]] for 
the compose operation say:
{quote}Concatenates a list of existing objects into a new object in the same 
bucket. The existing source objects are unaffected by this operation
{quote}
In your proposal, the same staging blob is both the target of the compose 
operation and one of the input blobs to be composed. That _might_ work, but 
would have to be tested to see if it's allowed. If it isn't, writing to a new 
blob and deleting the old one would have essentially the same effect. That's 
almost certainly what happens behind the scenes anyway, since blobs are 
immutable.

Bigger picture, if you're trying to combine millions of immutable blobs 
together in one step, 32 at a time, I don't see how you avoid having lots of 
intermediate composed blobs, one way or another, at least temporarily. The main 
question would be how quickly ones that are no longer needed are discarded.
{quote}To streamline this, we could modify {{GSCommitRecoverable}} to update 
the list of {{{}componentObjectIds{}}}, allowing the removal of blobs already 
appended to the {{stagingBlob}}
{quote}
Not quite following you here; the GSCommitRecoverable is provided as an input 
to the GSRecoverableWriterCommitter, providing the list of "raw" (i.e. 
uncomposed) temporary blobs that need to be composed and committed. If you 
removed those raw blobs from that list as they're added to the staging blob, 
what would that accomplish? The GSCommitRecoverable provided to the 
GSRecoverableWriterCommitter isn't persisted anywhere after the commit succeeds 
or fails, and if the commit were to be retried it would need the complete list 
anyway.
{quote}I notice an other issue with the code - because the storage.compose 
might throw a StorageException. With the current code this would mean the 
intermediate composition blobs are not cleaned up. 

If the code above is implemented, I think there is no longer a need the TTL 
feature since all necessary blobs should be written to a final blob. Any 
leftover blobs post-job completion would indicate a failed state.
{quote}
There is no real way to prevent the orphaning of intermediate blobs in all 
failure scenarios. Even in your proposed algorithm, the staging blob could be 
orphaned if the composition process failed partway through. This is what the 
temp bucket and TTL mechanism are for. It's optional, so you don't have to use 
it, but yes you would have to keep an eye out for orphaned intermediate blobs 
via some other mechanism f you choose not to use it.

Honestly, I think some of your difficulties are coming from trying to combine 
so much data together at once vs. doing it along the way, with commits at 
incremental checkpoints. I was going to suggest you reduce your checkpoint 
interval, to aggregate more frequently, but obviously that doesn't work if 
you're not checkpointing at all.

I do think the RecoverableWriter mechanism is designed to make writing 
predictable and repeatable in checkpoint/recovery situations (hence the name); 
if you're not doing any of that, you may run into some challenges. If you were 
to use checkpoints, you would aggregate more frequently, meaning fewer 
intermediate blobs with shorter lifetimes, and you'd be less likely to run up 
against the 5TB limit, as you would end up with a series of smaller files, 
written at each checkpoint, vs. one huge file.

If we do want to consider changes, here's what I suggest:
 * Test whether GCP allows composition to overwrite an existing blob that is 
also an input to the compose operation. If that works, then we could use that 
technique in the composeBlobs function. As mentioned above, I doubt that this 
actually reduces the number of blobs that exist temporarily – since blobs are 
immutable objects – but it would minimize their lifetime, effectively deleting 
them as soon as possible. If overwriting doesn't work, then we could achieve a 
similar effect by deleting intermediate blobs as soon as they are not needed 
anymore, 

[jira] [Comment Edited] (FLINK-34696) GSRecoverableWriterCommitter is generating excessive data blobs

2024-03-19 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828412#comment-17828412
 ] 

Galen Warren edited comment on FLINK-34696 at 3/19/24 5:06 PM:
---

I think your proposed algorithm is just another way of implementing the "delete 
intermediate blobs that are no longer necessary as soon as possible" idea we've 
considered. You accomplish it by overwriting the staging blob on each 
iteration; a similar effect could be achieved by writing to a new intermediate 
staging blob on each iteration (as is done now) and deleting the old one right 
away (which is not done now. instead this deletion occurs shortly thereafter, 
after the commit succeeds).

Aside: I'm not sure whether it's possible to overwrite the existing staging 
blob like you suggest. The [docs |[Objects: compose  |  Cloud Storage  |  
Google 
Cloud|https://cloud.google.com/storage/docs/json_api/v1/objects/compose]] for 
the compose operation say:
{quote}Concatenates a list of existing objects into a new object in the same 
bucket. The existing source objects are unaffected by this operation
{quote}
In your proposal, the same staging blob is both the target of the compose 
operation and one of the input blobs to be composed. That _might_ work, but 
would have to be tested to see if it's allowed. If it isn't, writing to a new 
blob and deleting the old one would have essentially the same effect. That's 
almost certainly what happens behind the scenes anyway, since blobs are 
immutable.

Bigger picture, if you're trying to combine millions of immutable blobs 
together in one step, 32 at a time, I don't see how you avoid having lots of 
intermediate composed blobs, one way or another, at least temporarily. The main 
question would be how quickly ones that are no longer needed are discarded.
{quote}To streamline this, we could modify {{GSCommitRecoverable}} to update 
the list of {{{}componentObjectIds{}}}, allowing the removal of blobs already 
appended to the {{stagingBlob}}
{quote}
Not quite following you here; the GSCommitRecoverable is provided as an input 
to the GSRecoverableWriterCommitter, providing the list of "raw" (i.e. 
uncomposed) temporary blobs that need to be composed and committed. If you 
removed those raw blobs from that list as they're added to the staging blob, 
what would that accomplish? The GSCommitRecoverable provided to the 
GSRecoverableWriterCommitter isn't persisted anywhere after the commit succeeds 
or fails, and if the commit were to be retried it would need the complete list 
anyway.
{quote}I notice an other issue with the code - because the storage.compose 
might throw a StorageException. With the current code this would mean the 
intermediate composition blobs are not cleaned up. 

If the code above is implemented, I think there is no longer a need the TTL 
feature since all necessary blobs should be written to a final blob. Any 
leftover blobs post-job completion would indicate a failed state.
{quote}
There is no real way to prevent the orphaning of intermediate blobs in all 
failure scenarios. Even in your proposed algorithm, the staging blob could be 
orphaned if the composition process failed partway through. This is what the 
temp bucket and TTL mechanism are for. It's optional, so you don't have to use 
it, but yes you would have to keep an eye out for orphaned intermediate blobs 
via some other mechanism f you choose not to use it.

Honestly, I think some of your difficulties are coming from trying to combine 
so much data together at once vs. doing it along the way, with commits at 
incremental checkpoints. I was going to suggest you reduce your checkpoint 
interval, to aggregate more frequently, but obviously that doesn't work if 
you're not checkpointing at all.

I do think the RecoverableWriter mechanism is designed to make writing 
predictable and repeatable in checkpoint/recovery situations (hence the name); 
if you're not doing any of that, you may run into some challenges. If you were 
to use checkpoints, you would aggregate more frequently, meaning fewer 
intermediate blobs with shorter lifetimes, and you'd be less likely to run up 
against the 5TB limit, as you would end up with a series of smaller files, 
written at each checkpoint, vs. one huge file.

If we do want to consider changes, here's what I suggest:
 * Test whether GCP allows composition to overwrite an existing blob that is 
also an input to the compose operation. If that works, then we could use that 
technique in the composeBlobs function. As mentioned above, I doubt that this 
actually reduces the number of blobs that exist temporarily – since blobs are 
immutable objects – but it would minimize their lifetime, effectively deleting 
them as soon as possible. If overwriting doesn't work, then we could achieve a 
similar effect by deleting intermediate blobs as soon as they are not needed 
anymore, 

[jira] [Comment Edited] (FLINK-34696) GSRecoverableWriterCommitter is generating excessive data blobs

2024-03-19 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828412#comment-17828412
 ] 

Galen Warren edited comment on FLINK-34696 at 3/19/24 3:32 PM:
---

I think your proposed algorithm is just another way of implementing the "delete 
intermediate blobs that are no longer necessary as soon as possible" idea we've 
considered. You accomplish it by overwriting the staging blob on each 
iteration; a similar effect could be achieved by writing to a new intermediate 
staging blob on each iteration (as is done now) and deleting the old one right 
away (which is not done now. instead this deletion occurs shortly thereafter, 
after the commit succeeds).

Aside: I'm not sure whether it's possible to overwrite the existing staging 
blob like you suggest. The [docs |[Objects: compose  |  Cloud Storage  |  
Google 
Cloud|https://cloud.google.com/storage/docs/json_api/v1/objects/compose]] for 
the compose operation say:
{quote}Concatenates a list of existing objects into a new object in the same 
bucket. The existing source objects are unaffected by this operation
{quote}
In your proposal, the same staging blob is both the target of the compose 
operation and one of the input blobs to be composed. That _might_ work, but 
would have to be tested to see if it's allowed. If it isn't, writing to a new 
blob and deleting the old one would have essentially the same effect. That's 
almost certainly what happens behind the scenes anyway, since blobs are 
immutable.

Bigger picture, if you're trying to combine millions of immutable blobs 
together in one step, 32 at a time, I don't see how you avoid having lots of 
intermediate composed blobs, one way or another, at least temporarily. The main 
question would be how quickly ones that are no longer needed are discarded.
{quote}To streamline this, we could modify {{GSCommitRecoverable}} to update 
the list of {{{}componentObjectIds{}}}, allowing the removal of blobs already 
appended to the {{stagingBlob}}
{quote}
Not quite following you here; the GSCommitRecoverable is provided as an input 
to the GSRecoverableWriterCommitter, providing the list of "raw" (i.e. 
uncomposed) temporary blobs that need to be composed and committed. If you 
removed those raw blobs from that list as they're added to the staging blob, 
what would that accomplish? The GSCommitRecoverable provided to the 
GSRecoverableWriterCommitter isn't persisted anywhere after the commit succeeds 
or fails, and if the commit were to be retried it would need the complete list 
anyway.
{quote}I notice an other issue with the code - because the storage.compose 
might throw a StorageException. With the current code this would mean the 
intermediate composition blobs are not cleaned up. 

If the code above is implemented, I think there is no longer a need the TTL 
feature since all necessary blobs should be written to a final blob. Any 
leftover blobs post-job completion would indicate a failed state.
{quote}
There is no real way to prevent the orphaning of intermediate blobs in all 
failure scenarios. Even in your proposed algorithm, the staging blob could be 
orphaned if the composition process failed partway through. This is what the 
temp bucket and TTL mechanism are for. It's optional, so you don't have to use 
it, but yes you would have to keep an eye out for orphaned intermediate blobs 
via some other mechanism f you choose not to use it.

Honestly, I think some of your difficulties are coming from trying to combine 
so much data together at once vs. doing it along the way, with commits at 
incremental checkpoints. I was going to suggest you reduce your checkpoint 
interval, to aggregate more frequently, but obviously that doesn't work if 
you're not checkpointing at all.

I do think the RecoverableWriter mechanism is designed to make writing 
predictable and repeatable in checkpoint/recovery situations (hence the name); 
if you're not doing any of that, you may run into some challenges. If you were 
to use checkpoints, you would aggregate more frequently, meaning fewer 
intermediate blobs with shorter lifetimes, and you'd be less likely to run up 
against the 5TB limit, as you would end up with a series of smaller files, 
written at each checkpoint, vs. one huge file.

If we do want to consider changes, here's what I suggest:
 * Test whether GCP allows composition to overwrite an existing blob that is 
also an input to the compose operation. If that works, then we could use that 
technique in the composeBlobs function. As mentioned above, I doubt that this 
actually reduces the number of blobs that exist temporarily – since blobs are 
immutable objects – but it would minimize their lifetime, effectively deleting 
them as soon as possible. If overwriting doesn't work, then we could achieve a 
similar effect by deleting intermediate blobs as soon as they are not needed 
anymore, 

[jira] [Commented] (FLINK-34696) GSRecoverableWriterCommitter is generating excessive data blobs

2024-03-19 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828412#comment-17828412
 ] 

Galen Warren commented on FLINK-34696:
--

I think your proposed algorithm is just another way of implementing the "delete 
intermediate blobs that are no longer necessary as soon as possible" idea we've 
considered. You accomplish it by overwriting the staging blob on each 
iteration; a similar effect could be achieved by writing to a new intermediate 
staging blob on each iteration (as is done now) and deleting the old one right 
away (which is not done now. instead this deletion occurs shortly thereafter, 
after the commit succeeds).

Aside: I'm not sure whether it's possible to overwrite the existing staging 
blob like you suggest. The [docs |[Objects: compose  |  Cloud Storage  |  
Google 
Cloud|https://cloud.google.com/storage/docs/json_api/v1/objects/compose]] for 
the compose operation say:
{quote}Concatenates a list of existing objects into a new object in the same 
bucket. The existing source objects are unaffected by this operation
{quote}
In your proposal, the same staging blob is both the target of the compose 
operation and one of the input blobs to be composed. That _might_ work, but 
would have to be tested to see if it's allowed. If it isn't, writing to a new 
blob and deleting the old one would have essentially the same effect. That's 
almost certainly what happens behind the scenes anyway, since blobs are 
immutable.

Bigger picture, if you're trying to combine millions of immutable blobs 
together in one step, 32 at a time, I don't see how you avoid having lots of 
intermediate composed blobs, one way or another, at least temporarily. The main 
question would be how quickly ones that are no longer needed are discarded.
{quote}To streamline this, we could modify {{GSCommitRecoverable}} to update 
the list of {{{}componentObjectIds{}}}, allowing the removal of blobs already 
appended to the {{stagingBlob}}
{quote}
Not quite following you here; the GSCommitRecoverable is provided as an input 
to the GSRecoverableWriterCommitter, providing the list of "raw" (i.e. 
uncomposed) temporary blobs that need to be composed and committed. If you 
removed those raw blobs from that list as they're added to the staging blob, 
what would that accomplish? The GSCommitRecoverable provided to the 
GSRecoverableWriterCommitter isn't persisted anywhere after the commit succeeds 
or fails, and if the commit were to be retried it would need the complete list 
anyway.
{quote}I notice an other issue with the code - because the storage.compose 
might throw a StorageException. With the current code this would mean the 
intermediate composition blobs are not cleaned up. 

If the code above is implemented, I think there is no longer a need the TTL 
feature since all necessary blobs should be written to a final blob. Any 
leftover blobs post-job completion would indicate a failed state.
{quote}
There is no real way to prevent the orphaning of intermediate blobs in all 
failure scenarios. Even in your proposed algorithm, the staging blob could be 
orphaned if the composition process failed partway through. This is what the 
temp bucket and TTL mechanism are for. It's optional, so you don't have to use 
it, but yes you would have to keep an eye out for orphaned intermediate blobs 
via some other mechanism f you choose not to use it.

Honestly, I think some of your difficulties are coming from trying to combine 
so much data together at once vs. doing it along the way, with commits at 
incremental checkpoints. I was going to suggest you reduce your checkpoint 
interval, to aggregate more frequently, but obviously that doesn't work if 
you're not checkpointing at all.

I do think the RecoverableWriter mechanism is designed to make writing 
predictable and repeatable in checkpoint/recovery situations (hence the name); 
if you're not doing any of that, you may run into some challenges. If you were 
to use checkpoints, you would aggregate more frequently, meaning fewer 
intermediate blobs with shorter lifetimes, and you'd be less likely to run up 
against the 5TB limit, as you would end up with a series of smaller files, 
written at each checkpoint, vs. one huge file.

If we do want to consider changes, here's what I suggest:
 * Test whether GCP allows composition to overwrite an existing blob that is 
also an input to the compose operation. If that works, then we could use that 
technique in the composeBlobs function. As mentioned above, I doubt that this 
actually reduces the number of blobs that exist temporarily – since blobs are 
immutable objects – but it would minimize their lifetime, effectively deleting 
them as soon as possible. If overwriting doesn't work, then we could achieve a 
similar effect by deleting intermediate blobs as soon as they are not needed 
anymore, rather than waiting until the commit succeeds.

 
 * 

[jira] [Comment Edited] (FLINK-34696) GSRecoverableWriterCommitter is generating excessive data blobs

2024-03-18 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828081#comment-17828081
 ] 

Galen Warren edited comment on FLINK-34696 at 3/18/24 7:35 PM:
---

Re: {*}Composition{*}: My understanding is that Flink recoverable writers are 
supposed to follow a few rules, one of which is that data should not be visible 
in the final destination until a successful commit occurs. From the description 
of the 
[RecoverableWriter|[https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html]:|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html]
{quote}The RecoverableWriter creates and recovers 
[{{RecoverableFsDataOutputStream}}|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.html].
 It can be used to write data to a file system in a way that the writing can be 
resumed consistently after a failure and recovery without loss of data or 
possible duplication of bytes.

The streams do not make the files they write to immediately visible, but 
instead write to temp files or other temporary storage. To publish the data 
atomically in the end, the stream offers the 
[{{RecoverableFsDataOutputStream.closeForCommit()}}|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.html#closeForCommit--]
 method to create a committer that publishes the result.
{quote}
This atomicity is required, afaik, so writing to the final blob along the way 
wouldn't really work.

(Though, as I mentioned above, I do think it would be possible to improve the 
the efficiency of the blob composition somewhat in terms of total data storage 
required in cases where there are lots of files to be composed, even when using 
intermediate files.)

Re: *Optimization of the Recovery Mechanism:* Yes, it would be possible to 
delete some of the intermediate composed blobs along the way, i.e. any that are 
not "root" temporary blobs. They are currently deleted at the end of the 
commit.  I suppose that doing it along the way would cause those blobs to live 
a _slightly_ shorter period of time and not overlap as much. So I understand, 
which is the issue? Blob storage costs are prorated pretty granularly so I 
wouldn't think that short window of time would affect costs that much, but I 
could be wrong about that. Are you running into cost issues, total storage 
limits, something else (?) during the current bursts?

Re: *Handling the 5 TB Blob Limit:* Yes, it would be possible to write two (or 
more) blobs to stay under this limit when composing. I'm not completely sure 
what that means in terms of the atomicity requirement, i.e. I don't think 
there's any way to do multiple atomic writes in GCP, so there would be a 
possibility that data could appear and then disappear, i.e. in the case that 
write of final blob #1 succeeded and then the write of final blob #2 failed and 
both were deleted because of the failed commit. But, not sure what other 
options exist. It would be interesting to know how the S3 recoverable writer 
handles similar limits.

Re: *Rationale Behind Blob Composition:* see above.

Re: *TTL and temporary bucket:* Could you start without a defined TTL on the 
bucket while gathering the necessary data about the app, and then turn it on 
later? Or, you can just leave it off entirely, in which case the temp files go 
into the main bucket, just in a different location.


was (Author: galenwarren):
Re: {*}Composition{*}: My understanding is that Flink recoverable writers are 
supposed to follow a few rules, one of which is that data should not be visible 
in the final destination until a successful commit occurs. From the description 
of the 
[RecoverableWriter|[https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html]:|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html]
{quote}The RecoverableWriter creates and recovers 
[{{RecoverableFsDataOutputStream}}|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.html].
 It can be used to write data to a file system in a way that the writing can be 
resumed consistently after a failure and recovery without loss of data or 
possible duplication of bytes.

The streams do not make the files they write to immediately visible, but 
instead write to temp files or other temporary storage. To publish the data 
atomically in the end, the stream offers the 
[{{RecoverableFsDataOutputStream.closeForCommit()}}|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.html#closeForCommit--]
 method to create a committer that 

[jira] [Comment Edited] (FLINK-34696) GSRecoverableWriterCommitter is generating excessive data blobs

2024-03-18 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828081#comment-17828081
 ] 

Galen Warren edited comment on FLINK-34696 at 3/18/24 7:21 PM:
---

Re: {*}Composition{*}: My understanding is that Flink recoverable writers are 
supposed to follow a few rules, one of which is that data should not be visible 
in the final destination until a successful commit occurs. From the description 
of the 
[RecoverableWriter|[https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html]:|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html]
{quote}The RecoverableWriter creates and recovers 
[{{RecoverableFsDataOutputStream}}|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.html].
 It can be used to write data to a file system in a way that the writing can be 
resumed consistently after a failure and recovery without loss of data or 
possible duplication of bytes.

The streams do not make the files they write to immediately visible, but 
instead write to temp files or other temporary storage. To publish the data 
atomically in the end, the stream offers the 
[{{RecoverableFsDataOutputStream.closeForCommit()}}|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.html#closeForCommit--]
 method to create a committer that publishes the result.
{quote}
This atomicity is required, afaik, so writing to the final blob along the way 
wouldn't really work.

(Though, as I mentioned above, I do think it would be possible to improve the 
the efficiency of the blob composition somewhat in terms of total data storage 
required in cases where there are lots of files to be composed, even when using 
intermediate files.)

Re: *Optimization of the Recovery Mechanism:* Yes, it would be possible to 
delete some of the intermediate composed blobs along the way, i.e. any that are 
not "root" temporary blobs. They are currently deleted at the end of the 
commit.  I suppose that doing it along the way would cause those blobs to live 
a _slightly_ shorter period of time and not overlap as much. So I understand, 
which is the issue? Blob storage costs are prorated pretty granularly so I 
wouldn't think that short window of time would affect costs that much, but I 
could be wrong about that. Are you running into cost issues, total storage 
limits, something else (?) during the current bursts?

Re: *Handling the 5 TB Blob Limit:* Yes, it would be possible to write two 
blobs to stay under this limit when composing. I'm not completely sure what 
that means in terms of the atomicity requirement, i.e. I don't think there's 
any way to do multiple atomic writes in GCP, so there would be a possibility 
that data could appear and then disappear, i.e. in the case that write of final 
blob #1 succeeded and then the write of final blob #2 failed and both were 
deleted because of the failed commit. But, not sure what other options exist. 
It would be interesting to know how the S3 recoverable writer handles similar 
limits.

Re: *Rationale Behind Blob Composition:* see above.

Re: *TTL and temporary bucket:* Could you start without a defined TTL on the 
bucket while gathering the necessary data about the app, and then turn it on 
later? Or, you can just leave it off entirely, in which case the temp files go 
into the main bucket, just in a different location.


was (Author: galenwarren):
Re: {*}Composition{*}: My understanding is that Flink recoverable writers are 
supposed to follow a few rules, one of which is that data should not be visible 
in the final destination until a successful commit occurs. From the description 
of the 
[RecoverableWriter|[https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html]:|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html]
{quote}The RecoverableWriter creates and recovers 
[{{RecoverableFsDataOutputStream}}|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.html].
 It can be used to write data to a file system in a way that the writing can be 
resumed consistently after a failure and recovery without loss of data or 
possible duplication of bytes.

The streams do not make the files they write to immediately visible, but 
instead write to temp files or other temporary storage. To publish the data 
atomically in the end, the stream offers the 
[{{RecoverableFsDataOutputStream.closeForCommit()}}|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.html#closeForCommit--]
 method to create a committer that publishes 

[jira] [Commented] (FLINK-34696) GSRecoverableWriterCommitter is generating excessive data blobs

2024-03-18 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828081#comment-17828081
 ] 

Galen Warren commented on FLINK-34696:
--

Re: {*}Composition{*}: My understanding is that Flink recoverable writers are 
supposed to follow a few rules, one of which is that data should not be visible 
in the final destination until a successful commit occurs. From the description 
of the 
[RecoverableWriter|[https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html]:|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html]
{quote}The RecoverableWriter creates and recovers 
[{{RecoverableFsDataOutputStream}}|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.html].
 It can be used to write data to a file system in a way that the writing can be 
resumed consistently after a failure and recovery without loss of data or 
possible duplication of bytes.

The streams do not make the files they write to immediately visible, but 
instead write to temp files or other temporary storage. To publish the data 
atomically in the end, the stream offers the 
[{{RecoverableFsDataOutputStream.closeForCommit()}}|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.html#closeForCommit--]
 method to create a committer that publishes the result.
{quote}
This atomicity is required, afaik, so writing to the final blob along the way 
wouldn't really work.

(Though, as I mentioned above, I do think it would be possible to improve the 
the efficiency of the blob composition somewhat in terms of total data storage 
required in cases where there are lots of files to be composed, even when using 
intermediate files.)

Re: *Optimization of the Recovery Mechanism:* Yes, it would be possible to 
delete some of the intermediate composed blobs along the way, i.e. any that are 
not "root" intermediate blobs. They are currently deleted at the end of the 
commit.  I suppose that doing it along the way would cause those blobs to live 
a _slightly_ shorter period of time and not overlap as much. So I understand, 
which is the issue? Blob storage costs are prorated pretty granularly so I 
wouldn't think that short window of time would affect costs that much, but I 
could be wrong about that. Are you running into cost issues, total storage 
limits, something else (?) during the current bursts?

Re: *Handling the 5 TB Blob Limit:* Yes, it would be possible to write two 
blobs to stay under this limit when composing. I'm not completely sure what 
that means in terms of the atomicity requirement, i.e. I don't think there's 
any way to do multiple atomic writes in GCP, so there would be a possibility 
that data could appear and then disappear, i.e. in the case that write of final 
blob #1 succeeded and then the write of final blob #2 failed and both were 
deleted because of the failed commit. But, not sure what other options exist. 
It would be interesting to know how the S3 recoverable writer handles similar 
limits.

Re: *Rationale Behind Blob Composition:* see above.

Re: *TTL and temporary bucket:* Could you start without a defined TTL on the 
bucket while gathering the necessary data about the app, and then turn it on 
later? Or, you can just leave it off entirely, in which case the temp files go 
into the main bucket, just in a different location.

> GSRecoverableWriterCommitter is generating excessive data blobs
> ---
>
> Key: FLINK-34696
> URL: https://issues.apache.org/jira/browse/FLINK-34696
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Simon-Shlomo Poil
>Priority: Major
>
> The `composeBlobs` method in 
> `org.apache.flink.fs.gs.writer.GSRecoverableWriterCommitter` is designed to 
> merge multiple small blobs into a single large blob using Google Cloud 
> Storage's compose method. This process is iterative, combining the result 
> from the previous iteration with 31 new blobs until all blobs are merged. 
> Upon completion of the composition, the method proceeds to remove the 
> temporary blobs.
> *Issue:*
> This methodology results in significant, unnecessary data storage consumption 
> during the blob composition process, incurring considerable costs due to 
> Google Cloud Storage pricing models.
> *Example to Illustrate the Problem:*
>  - Initial state: 64 blobs, each 1 GB in size (totaling 64 GB).
>  - After 1st step: 32 blobs are merged into a single blob, increasing total 
> storage to 96 GB (64 original + 32 GB new).
>  - After 2nd step: The newly created 32 GB blob is merged with 31 more blobs, 
> raising the total to 

[jira] [Commented] (FLINK-34696) GSRecoverableWriterCommitter is generating excessive data blobs

2024-03-15 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827590#comment-17827590
 ] 

Galen Warren commented on FLINK-34696:
--

One more thing, regarding:
{quote}To mitigate this problem, we propose modifying the `composeBlobs` method 
to immediately delete source blobs once they have been successfully combined. 
This change could significantly reduce data duplication and associated costs. 
{quote}
I'm not sure it would be safe to delete the raw temporary blobs (i.e. the 
uncomposed ones) until the commit succeeds, because they are referenced in the 
Recoverable object and would need to be there if a commit were retried. I 
suppose it would be fine to delete truly intermediate blobs along the way 
during the composition process, but these are deleted anyway at the end of the 
commit, so does that buy much? Perhaps it does with very large files.

 

> GSRecoverableWriterCommitter is generating excessive data blobs
> ---
>
> Key: FLINK-34696
> URL: https://issues.apache.org/jira/browse/FLINK-34696
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Simon-Shlomo Poil
>Priority: Major
>
> The `composeBlobs` method in 
> `org.apache.flink.fs.gs.writer.GSRecoverableWriterCommitter` is designed to 
> merge multiple small blobs into a single large blob using Google Cloud 
> Storage's compose method. This process is iterative, combining the result 
> from the previous iteration with 31 new blobs until all blobs are merged. 
> Upon completion of the composition, the method proceeds to remove the 
> temporary blobs.
> *Issue:*
> This methodology results in significant, unnecessary data storage consumption 
> during the blob composition process, incurring considerable costs due to 
> Google Cloud Storage pricing models.
> *Example to Illustrate the Problem:*
>  - Initial state: 64 blobs, each 1 GB in size (totaling 64 GB).
>  - After 1st step: 32 blobs are merged into a single blob, increasing total 
> storage to 96 GB (64 original + 32 GB new).
>  - After 2nd step: The newly created 32 GB blob is merged with 31 more blobs, 
> raising the total to 159 GB.
>  - After 3rd step: The final blob is merged, culminating in a total of 223 GB 
> to combine the original 64 GB of data. This results in an overhead of 159 GB.
> *Impact:*
> This inefficiency has a profound impact, especially at scale, where terabytes 
> of data can incur overheads in the petabyte range, leading to unexpectedly 
> high costs. Additionally, we have observed an increase in storage exceptions 
> thrown by the Google Storage library, potentially linked to this issue.
> *Suggested Solution:*
> To mitigate this problem, we propose modifying the `composeBlobs` method to 
> immediately delete source blobs once they have been successfully combined. 
> This change could significantly reduce data duplication and associated costs. 
> However, the implications for data recovery and integrity need careful 
> consideration to ensure that this optimization does not compromise the 
> ability to recover data in case of a failure during the composition process.
> *Steps to Reproduce:*
> 1. Initiate the blob composition process in an environment with a significant 
> number of blobs (e.g., 64 blobs of 1 GB each).
> 2. Observe the temporary increase in data storage as blobs are iteratively 
> combined.
> 3. Note the final amount of data storage used compared to the initial total 
> size of the blobs.
> *Expected Behavior:*
> The blob composition process should minimize unnecessary data storage use, 
> efficiently managing resources to combine blobs without generating excessive 
> temporary data overhead.
> *Actual Behavior:*
> The current implementation results in significant temporary increases in data 
> storage, leading to high costs and potential system instability due to 
> frequent storage exceptions.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34696) GSRecoverableWriterCommitter is generating excessive data blobs

2024-03-15 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827568#comment-17827568
 ] 

Galen Warren edited comment on FLINK-34696 at 3/15/24 5:49 PM:
---

No, this just moves the storage of the temporary files to a different bucket so 
that a lifecycle policy can be applied to them. The intermediate blobs still 
have to be composed into the final blob either way. And yes there is one extra 
copy if you use the temp bucket, but the benefit is you won't orphan temporary 
files (more below).

Upon rereading and looking back at the code (it's been a while!), I may have 
misunderstood the initial issue. There could be a couple things going on here.

1) There could be orphaned temporary blobs sticking around, because they were 
written at one point and then due to restores from check/savepoints, they were 
"forgotten". Writing all intermediate files to a temporary bucket and then 
applying a lifecycle policy to that bucket would address that issue. It does 
come at the cost of one extra copy operation; in GCP blobs cannot be composed 
into a different bucket, so using the temporary bucket incurs one extra copy 
that doesn't occur if the temporary bucket is the same as the final bucket.

2) There could be large numbers of temporary blobs being composed together, 
i.e. the writing pattern is such that large numbers of writes occur between 
checkpoints. Since there's a limit to how many blobs can be composed together 
at one time (32), there's no way to avoid a temporary blob's data being 
involved in more than compose operation, in principle, but I do think the 
composition could be optimized differently.

I think #2 is the issue here, not #1? Though one should also be careful about 
orphaned blobs if state is ever restored from check/savepoints.

Regarding optimization: Currently, the composing of blobs occurs 
[here|https://github.com/apache/flink/blob/f6e1b493bd6292a87efd130a0e76af8bd750c1c9/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java#L131].

It does it in a way that minimizes the number of compose operations but not 
necessarily the total volume of data involved in those compose operations, 
which I think is the issue here.

Consider the situation where there are 94 temporary blobs to be composed into a 
committed blob. As it stands now, the first 32 would be committed into 1 blob, 
leaving 1 + (94 - 32) = 63 blobs remaining. This process would be repeated, 
composing the first 32 against into 1, leaving 1 + (63 - 32) = 32 blobs. Then 
these remaining 32 blobs would be composed into 1 final blob. So, 3 total 
compose operations.

This could be done another way – the first 32 blobs could be composed into 1 
blob, the second 32 blobs could be composed into 1 blob, and the third 30 blobs 
could be composed into 1 blob, resulting in three intermediate blobs that would 
then be composed into the final blob. So, 4 total compose operations in this 
case. Still potentially recursive with large numbers of blobs due to the 
32-blob limit, but this would avoid the piling up of data in the first blob 
when there are large numbers of temporary blobs.

Even though the second method would generally involve more total compose 
operations than the first, it would minimize the total bytes being composed. I 
doubt the difference would be significant for small numbers of temporary blobs 
and could help with large numbers of temporary blobs. That would seem like a 
reasonable enhancement to me.


was (Author: galenwarren):
No, this just moves the storage of the temporary files to a different bucket so 
that a lifecycle policy can be applied to them. The intermediate blobs still 
have to be composed into the final blob either way. And yes there is one extra 
copy if you use the temp bucket, but the benefit is you won't orphan temporary 
files (more below).

Upon rereading and looking back at the code (it's been a while!), I may have 
misunderstood the initial issue. There could be a couple things going on here.

1) There could be orphaned temporary blobs sticking around, because they were 
written at one point and then due to restores from check/savepoints, they were 
"forgotten". Writing all intermediate files to a temporary bucket and then 
applying a lifecycle policy to that bucket would address that issue. It does 
come at the cost of one extra copy operation; in GCP blobs cannot be composed 
into a different bucket, so using the temporary bucket incurs one extra copy 
that doesn't occur if the temporary bucket is the same as the final bucket.

2) There could be large numbers of temporary blobs being composed together, 
i.e. the writing pattern is such that large numbers of writes occur between 
checkpoints. Since there's a limit to how many blobs can be composed together 
at one time (32), there's no way to avoid a temporary 

[jira] [Commented] (FLINK-34696) GSRecoverableWriterCommitter is generating excessive data blobs

2024-03-15 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827568#comment-17827568
 ] 

Galen Warren commented on FLINK-34696:
--

No, this just moves the storage of the temporary files to a different bucket so 
that a lifecycle policy can be applied to them. The intermediate blobs still 
have to be composed into the final blob either way. And yes there is one extra 
copy if you use the temp bucket, but the benefit is you won't orphan temporary 
files (more below).

Upon rereading and looking back at the code (it's been a while!), I may have 
misunderstood the initial issue. There could be a couple things going on here.

1) There could be orphaned temporary blobs sticking around, because they were 
written at one point and then due to restores from check/savepoints, they were 
"forgotten". Writing all intermediate files to a temporary bucket and then 
applying a lifecycle policy to that bucket would address that issue. It does 
come at the cost of one extra copy operation; in GCP blobs cannot be composed 
into a different bucket, so using the temporary bucket incurs one extra copy 
that doesn't occur if the temporary bucket is the same as the final bucket.

2) There could be large numbers of temporary blobs being composed together, 
i.e. the writing pattern is such that large numbers of writes occur between 
checkpoints. Since there's a limit to how many blobs can be composed together 
at one time (32), there's no way to avoid a temporary blob's data being 
involved in more than compose operation, in principle, but I do think the 
composition could be optimized differently.

I think #2 is the issue here, not #1? Though one should also be careful about 
orphaned blobs if state is ever restored from check/savepoints.

Regarding optimization: Currently, the composing of blobs occurs 
[here|[https://github.com/apache/flink/blob/f6e1b493bd6292a87efd130a0e76af8bd750c1c9/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java#L131]].

It does it in a way that minimizes the number of compose operations but not 
necessarily the total volume of data involved in those compose operations, 
which I think is the issue here.

Consider the situation where there are 94 temporary blobs to be composed into a 
committed blob. As it stands now, the first 32 would be committed into 1 blob, 
leaving 1 + (94 - 32) = 63 blobs remaining. This process would be repeated, 
composing the first 32 against into 1, leaving 1 + (63 - 32) = 32 blobs. Then 
these remaining 32 blobs would be composed into 1 final blob. So, 3 total 
compose operations.

This could be done another way – the first 32 blobs could be composed into 1 
blob, the second 32 blobs could be composed into 1 blob, and the third 30 blobs 
could be composed into 1 blob, resulting in three intermediate blobs that would 
then be composed into the final blob. So, 4 total compose operations in this 
case. Still potentially recursive with large numbers of blobs due to the 
32-blob limit, but this would avoid the piling up of data in the first blob 
when there are large numbers of temporary blobs.

Even though the second method would generally involve more total compose 
operations than the first, it would minimize the total bytes being composed. I 
doubt the difference would be significant for small numbers of temporary blobs 
and could help with large numbers of temporary blobs. That would seem like a 
reasonable enhancement to me.

 

 

 

 

> GSRecoverableWriterCommitter is generating excessive data blobs
> ---
>
> Key: FLINK-34696
> URL: https://issues.apache.org/jira/browse/FLINK-34696
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Simon-Shlomo Poil
>Priority: Major
>
> The `composeBlobs` method in 
> `org.apache.flink.fs.gs.writer.GSRecoverableWriterCommitter` is designed to 
> merge multiple small blobs into a single large blob using Google Cloud 
> Storage's compose method. This process is iterative, combining the result 
> from the previous iteration with 31 new blobs until all blobs are merged. 
> Upon completion of the composition, the method proceeds to remove the 
> temporary blobs.
> *Issue:*
> This methodology results in significant, unnecessary data storage consumption 
> during the blob composition process, incurring considerable costs due to 
> Google Cloud Storage pricing models.
> *Example to Illustrate the Problem:*
>  - Initial state: 64 blobs, each 1 GB in size (totaling 64 GB).
>  - After 1st step: 32 blobs are merged into a single blob, increasing total 
> storage to 96 GB (64 original + 32 GB new).
>  - After 2nd step: The newly created 32 GB blob is merged with 31 more blobs, 
> raising the total 

[jira] [Commented] (FLINK-34696) GSRecoverableWriterCommitter is generating excessive data blobs

2024-03-15 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827528#comment-17827528
 ] 

Galen Warren commented on FLINK-34696:
--

Yes, the issue is recoverability. However, there is one thing you can do. 
Create a separate bucket in GCP for temporary files and initialize the 
filesystem configuration (via FileSystem.initialize) with 
*_[gs.writer.temporary.bucket.name|http://gs.writer.temporary.bucket.name/]_* 
set to the name of that bucket. This will cause the GSRecoverableWriter to 
write intermediate/temporary files to that bucket instead of the "real" bucket. 
Then, you can apply a TTL[ lifecycle policy 
|https://cloud.google.com/storage/docs/lifecycle]to the temporary bucket to 
have files be deleted after whatever TTL you want. 
 
If you try to recover to a check/savepoint farther back in time than that TTL 
interval, the recovery will probably fail, but this will let you dial in 
whatever recoverability period you want, i.e. longer (at higher storage cost) 
or shorter (at lower storage cost).

> GSRecoverableWriterCommitter is generating excessive data blobs
> ---
>
> Key: FLINK-34696
> URL: https://issues.apache.org/jira/browse/FLINK-34696
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Reporter: Simon-Shlomo Poil
>Priority: Major
>
> The `composeBlobs` method in 
> `org.apache.flink.fs.gs.writer.GSRecoverableWriterCommitter` is designed to 
> merge multiple small blobs into a single large blob using Google Cloud 
> Storage's compose method. This process is iterative, combining the result 
> from the previous iteration with 31 new blobs until all blobs are merged. 
> Upon completion of the composition, the method proceeds to remove the 
> temporary blobs.
> *Issue:*
> This methodology results in significant, unnecessary data storage consumption 
> during the blob composition process, incurring considerable costs due to 
> Google Cloud Storage pricing models.
> *Example to Illustrate the Problem:*
>  - Initial state: 64 blobs, each 1 GB in size (totaling 64 GB).
>  - After 1st step: 32 blobs are merged into a single blob, increasing total 
> storage to 96 GB (64 original + 32 GB new).
>  - After 2nd step: The newly created 32 GB blob is merged with 31 more blobs, 
> raising the total to 159 GB.
>  - After 3rd step: The final blob is merged, culminating in a total of 223 GB 
> to combine the original 64 GB of data. This results in an overhead of 159 GB.
> *Impact:*
> This inefficiency has a profound impact, especially at scale, where terabytes 
> of data can incur overheads in the petabyte range, leading to unexpectedly 
> high costs. Additionally, we have observed an increase in storage exceptions 
> thrown by the Google Storage library, potentially linked to this issue.
> *Suggested Solution:*
> To mitigate this problem, we propose modifying the `composeBlobs` method to 
> immediately delete source blobs once they have been successfully combined. 
> This change could significantly reduce data duplication and associated costs. 
> However, the implications for data recovery and integrity need careful 
> consideration to ensure that this optimization does not compromise the 
> ability to recover data in case of a failure during the composition process.
> *Steps to Reproduce:*
> 1. Initiate the blob composition process in an environment with a significant 
> number of blobs (e.g., 64 blobs of 1 GB each).
> 2. Observe the temporary increase in data storage as blobs are iteratively 
> combined.
> 3. Note the final amount of data storage used compared to the initial total 
> size of the blobs.
> *Expected Behavior:*
> The blob composition process should minimize unnecessary data storage use, 
> efficiently managing resources to combine blobs without generating excessive 
> temporary data overhead.
> *Actual Behavior:*
> The current implementation results in significant temporary increases in data 
> storage, leading to high costs and potential system instability due to 
> frequent storage exceptions.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31619) Upgrade Stateful Functions to Flink 1.16.1

2023-08-14 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17754199#comment-17754199
 ] 

Galen Warren commented on FLINK-31619:
--

Created pull request: [[FLINK-31619] Upgrade Stateful Functions to Flink 1.16.1 
by galenwarren · Pull Request #331 · apache/flink-statefun 
(github.com)|https://github.com/apache/flink-statefun/pull/331]

> Upgrade Stateful Functions to Flink 1.16.1
> --
>
> Key: FLINK-31619
> URL: https://issues.apache.org/jira/browse/FLINK-31619
> Project: Flink
>  Issue Type: Improvement
>  Components: Stateful Functions
>Reporter: nowke
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available
>
> Upgrade Statefun to use Flink 1.16.1. Could you assign this to me?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25993) Option to disable Kryo.setRegistrationRequired(false)

2022-10-21 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17622344#comment-17622344
 ] 

Galen Warren commented on FLINK-25993:
--

[~chesnay] [~diagonalbishop] 

I found your old discussion thread on this topic: 
[https://lists.apache.org/thread/0bzro3qmmsqrz5kh7lmx3xwfyj8wl5rk]

I'm also interested in this feature, and I have it working in a local fork. 
Would a PR be welcome here? Thanks.

> Option to disable Kryo.setRegistrationRequired(false)
> -
>
> Key: FLINK-25993
> URL: https://issues.apache.org/jira/browse/FLINK-25993
> Project: Flink
>  Issue Type: New Feature
>Affects Versions: 1.14.3
>Reporter: Shane Bishop
>Priority: Minor
>
> I would like to request a mechanism that a Flink library user could use to 
> optionally disable Kryo.setRegistrationRequired(false).
> The motivation is that Kyro.setRegistrationRequired(true) was made the safe 
> default in [this 
> commit|https://github.com/EsotericSoftware/kryo/commit/fc7f0cc7037ff1384b4cdac5b7ada287c64f0a00]
>  (specifically the change was [this 
> line|https://github.com/EsotericSoftware/kryo/commit/fc7f0cc7037ff1384b4cdac5b7ada287c64f0a00#diff-6d4638ca49aa0d0d9171ff04a0faa22e241f8320fda4a8a12c95853188d055a0R130]
>  in the commit). This default is overriden in the 1.14.3 Flink release (see 
> [KryoSerializer.java|https://github.com/apache/flink/blob/release-1.14.3/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L492]
>  and 
> [FlinkScalaKryoInstantiator.scala|https://github.com/apache/flink/blob/release-1.14.3/flink-scala/src/main/scala/org/apache/flink/runtime/types/FlinkScalaKryoInstantiator.scala#L46]).
> Reference to thread in mailing list: 
> https://lists.apache.org/thread/0bzro3qmmsqrz5kh7lmx3xwfyj8wl5rk



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27813) java.lang.IllegalStateException: after migration from statefun-3.1.1 to 3.2.0

2022-05-29 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17543637#comment-17543637
 ] 

Galen Warren commented on FLINK-27813:
--

Cool! Glad it was something simple.

> java.lang.IllegalStateException: after migration from statefun-3.1.1 to 3.2.0
> -
>
> Key: FLINK-27813
> URL: https://issues.apache.org/jira/browse/FLINK-27813
> Project: Flink
>  Issue Type: Bug
>  Components: API / State Processor
>Affects Versions: statefun-3.2.0
>Reporter: Oleksandr Kazimirov
>Priority: Blocker
> Attachments: screenshot-1.png
>
>
> Issue was met after migration from 
> flink-statefun:3.1.1-java11
> to
> flink-statefun:3.2.0-java11
>  
> {code:java}
> ts.execution-paymentManualValidationRequestEgress-egress, Sink: 
> ua.aval.payments.execution-norkomExecutionRequested-egress, Sink: 
> ua.aval.payments.execution-shareStatusToManualValidation-egress) (1/1)#15863 
> (98a1f6bef9a435ef9d0292fefeb64022) switched from RUNNING to FAILED with 
> failure cause: java.lang.IllegalStateException: Unable to parse Netty 
> transport spec.\n\tat 
> org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyClientFactory.parseTransportSpec(NettyRequestReplyClientFactory.java:56)\n\tat
>  
> org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyClientFactory.createTransportClient(NettyRequestReplyClientFactory.java:39)\n\tat
>  
> org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider.functionOfType(HttpFunctionProvider.java:49)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:70)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:47)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:71)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:59)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:48)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:153)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:148)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:90)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)\n\tat
>  
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)\n\tat
>  
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)\n\tat
>  
> org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:180)\n\tat
>  
> org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:86)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)\n\tat
>  
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)\n\tat
>  
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)\n\tat
>  
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)\n\tat
>  
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)\n\tat
>  
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)\n\tat
>  org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)\n\tat 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)\n\tat 
> java.base/java.lang.Thread.run(Unknown Source)\nCaused by: 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException:
>  Time interval unit label 'm' does not match any of the recognized units: 
> DAYS: (d | day | days), HOURS: (h | hour | hours), MINUTES: (min | minute | 

[jira] [Commented] (FLINK-27813) java.lang.IllegalStateException: after migration from statefun-3.1.1 to 3.2.0

2022-05-27 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17543099#comment-17543099
 ] 

Galen Warren commented on FLINK-27813:
--

Are you using RequestReplyFunctionBuilder to build your statefun job?

> java.lang.IllegalStateException: after migration from statefun-3.1.1 to 3.2.0
> -
>
> Key: FLINK-27813
> URL: https://issues.apache.org/jira/browse/FLINK-27813
> Project: Flink
>  Issue Type: Bug
>  Components: API / State Processor
>Affects Versions: statefun-3.2.0
>Reporter: Oleksandr
>Priority: Blocker
> Attachments: screenshot-1.png
>
>
> Issue was met after migration from 
> flink-statefun:3.1.1-java11
> to
> flink-statefun:3.2.0-java11
>  
> {code:java}
> ts.execution-paymentManualValidationRequestEgress-egress, Sink: 
> ua.aval.payments.execution-norkomExecutionRequested-egress, Sink: 
> ua.aval.payments.execution-shareStatusToManualValidation-egress) (1/1)#15863 
> (98a1f6bef9a435ef9d0292fefeb64022) switched from RUNNING to FAILED with 
> failure cause: java.lang.IllegalStateException: Unable to parse Netty 
> transport spec.\n\tat 
> org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyClientFactory.parseTransportSpec(NettyRequestReplyClientFactory.java:56)\n\tat
>  
> org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyClientFactory.createTransportClient(NettyRequestReplyClientFactory.java:39)\n\tat
>  
> org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider.functionOfType(HttpFunctionProvider.java:49)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:70)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:47)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:71)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:59)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:48)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:153)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:148)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:90)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)\n\tat
>  
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)\n\tat
>  
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)\n\tat
>  
> org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:180)\n\tat
>  
> org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:86)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)\n\tat
>  
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)\n\tat
>  
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)\n\tat
>  
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)\n\tat
>  
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)\n\tat
>  
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)\n\tat
>  org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)\n\tat 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)\n\tat 
> java.base/java.lang.Thread.run(Unknown Source)\nCaused by: 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException:
>  Time interval unit label 'm' does not match any of the recognized units: 
> DAYS: (d | day | days), HOURS: (h | hour | hours), 

[jira] [Commented] (FLINK-27813) java.lang.IllegalStateException: after migration from statefun-3.1.1 to 3.2.0

2022-05-27 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17542929#comment-17542929
 ] 

Galen Warren commented on FLINK-27813:
--

Hmm, not sure. The relevant part of the error message seems to be this:
Time interval unit label 'm' does not match any of the recognized units: DAYS: 
(d | day | days), HOURS: (h | hour | hours), MINUTES: (min | minute | minutes), 
SECONDS: (s | sec | secs | second | seconds), MILLISECONDS: (ms | milli | 
millis | millisecond | milliseconds), MICROSECONDS: (µs | micro | micros | 
microsecond | microseconds), NANOSECONDS: (ns | nano | nanos | nanosecond | 
nanoseconds) 
Do you have a time unit in your spec labeled with an 'm'?

> java.lang.IllegalStateException: after migration from statefun-3.1.1 to 3.2.0
> -
>
> Key: FLINK-27813
> URL: https://issues.apache.org/jira/browse/FLINK-27813
> Project: Flink
>  Issue Type: Bug
>  Components: API / State Processor
>Affects Versions: statefun-3.2.0
>Reporter: Oleksandr
>Priority: Blocker
> Attachments: screenshot-1.png
>
>
> Issue was met after migration from 
> flink-statefun:3.1.1-java11
> to
> flink-statefun:3.2.0-java8
>  
> {code:java}
> ts.execution-paymentManualValidationRequestEgress-egress, Sink: 
> ua.aval.payments.execution-norkomExecutionRequested-egress, Sink: 
> ua.aval.payments.execution-shareStatusToManualValidation-egress) (1/1)#15863 
> (98a1f6bef9a435ef9d0292fefeb64022) switched from RUNNING to FAILED with 
> failure cause: java.lang.IllegalStateException: Unable to parse Netty 
> transport spec.\n\tat 
> org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyClientFactory.parseTransportSpec(NettyRequestReplyClientFactory.java:56)\n\tat
>  
> org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyClientFactory.createTransportClient(NettyRequestReplyClientFactory.java:39)\n\tat
>  
> org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider.functionOfType(HttpFunctionProvider.java:49)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:70)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:47)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:71)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:59)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:48)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:153)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:148)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:90)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)\n\tat
>  
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)\n\tat
>  
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)\n\tat
>  
> org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:180)\n\tat
>  
> org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:86)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)\n\tat
>  
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)\n\tat
>  
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)\n\tat
>  
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)\n\tat
>  
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)\n\tat
>  
> 

[jira] [Commented] (FLINK-26462) Release Testing: Running Python UDF in different Execution Mode

2022-03-07 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17502469#comment-17502469
 ] 

Galen Warren commented on FLINK-26462:
--

I just ran through the testing steps and everything worked for me.

Just one issue, I think it might be a typo in the setup steps. This command:
$ python setup.py
... yields this error:
{code:java}
usage: setup.py [global_opts] cmd1 [cmd1_opts] [cmd2 [cmd2_opts] ...]
   or: setup.py --help [cmd1 cmd2 ...]
   or: setup.py --help-commands
   or: setup.py cmd --help
error: no commands supplied  {code}
Running the next line:
$ python setpy.py sdist
... succeeds.

> Release Testing: Running Python UDF in different Execution Mode
> ---
>
> Key: FLINK-26462
> URL: https://issues.apache.org/jira/browse/FLINK-26462
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Affects Versions: 1.15.0
>Reporter: Huang Xingbo
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.15.0
>
>
> h1. Setup
> * Build flink source code and compile source code
> {code:bash}
> $ cd {flink-source-code}
> $ mvn clean install -DskipTests
> {code}
> * Prepare a Python Virtual Environment
> {code:bash}
> $ cd flink-python/dev
> $ ./lint-python.sh -s basic
> $ source .conda/bin/activate
> {code}
> * Install PyFlink from source code. For more details, you can refer to the 
> [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink]
> {code:bash}
> $ cd flink-python/apache-flink-libraries
> $ python setup.py sdist
> $ pip install dist/*.tar.gz
> $ cd ..
> $ pip install -r dev/dev-requirements.txt
> $ python setup.py
> $ python setpy.py sdist
> $ pip install dist/*.tar.gz
> {code}
> h1. Test
> * Write a python udf job named demo.py in process mode
> {code:python}
> from pyflink.table.table_environment import TableEnvironment
> from pyflink.table.environment_settings import EnvironmentSettings
> from pyflink.table.udf import ScalarFunction, udf
> from pyflink.table import DataTypes, expressions as expr
> class SubtractOne(ScalarFunction):
> def eval(self, i):
> return i - 1
> @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], 
> result_type=DataTypes.BIGINT())
> def add(i, j):
> return i + j
> def main():
> t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode())
> # process mode !
> 
> t_env.get_config().get_configuration().set_string("python.execution-mode", 
> "process")
> # optinal values
> t_env.get_config().get_configuration().set_string("parallelism.default", 
> "2")
> add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())
> subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT())
> t = t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', 
> 'c'])
> result = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1))
> print(result.to_pandas())
> if __name__ == '__main__':
> main()
> {code}
> * run the python udf job and watch the result
> {code:bash}
> $ python demo.py
>_c0  c  _c2
> 03  11
> 17  21
> 24  31
> {code}
> * change the python udf job to multi-thread mode
> {code:python}
> from pyflink.table.table_environment import TableEnvironment
> from pyflink.table.environment_settings import EnvironmentSettings
> from pyflink.table.udf import ScalarFunction, udf
> from pyflink.table import DataTypes, expressions as expr
> class SubtractOne(ScalarFunction):
> def eval(self, i):
> return i - 1
> @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], 
> result_type=DataTypes.BIGINT())
> def add(i, j):
> return i + j
> def main():
> t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode())
> # multi-thread mode
> 
> t_env.get_config().get_configuration().set_string("python.execution-mode", 
> "multi-thread")
> t_env.get_config().get_configuration().set_string("parallelism.default", 
> "2")
> add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())
> subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT())
> t = t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', 
> 'c'])
> result = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1))
> print(result.to_pandas())
> if __name__ == '__main__':
> main()
> {code}
> * run the python udf job and watch the result
> {code:bash}
> $ python demo.py
>_c0  c  _c2
> 03  11
> 17  21
> 24  31
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26451) Test GCS FileSystem w/ RecoverableWriter manually

2022-03-02 Thread Galen Warren (Jira)
Galen Warren created FLINK-26451:


 Summary: Test GCS FileSystem w/ RecoverableWriter manually
 Key: FLINK-26451
 URL: https://issues.apache.org/jira/browse/FLINK-26451
 Project: Flink
  Issue Type: Improvement
  Components: FileSystems
Reporter: Galen Warren
 Fix For: 1.15.0


This is a manual testing issue for 
[FLINK-11838|https://github.com/apache/flink/pull/15599].

The linked issue adds support for a Flink FileSystem backed by Google Cloud 
Storage (GCS). This FileSystem implementation includes RecoverableWriter 
support, which allows it to be used with [FileSystem 
connector|https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/]
 sinks.

Suggested things to test:
 * Validate writing checkpoints and/or savepoints to a GCS bucket via a *gs://* 
endpoint
 * Validate reading from and writing to *gs://* endpoints via FileSystem 
connector sources and sinks

Note that a Google Cloud account is required to access GCS buckets. 
Instructions for how to provide the necessary credentials are included in the 
documentation.

Documentation:
 * 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/gcs/

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26375) Fix Statefun Golang SDK to return nil from Context.Caller when there is no caller

2022-02-25 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17498194#comment-17498194
 ] 

Galen Warren commented on FLINK-26375:
--

PR created: [[FLINK-26375][statefun-golang-sdk] Fix Statefun Golang SDK to 
return nil from Context.Caller when there is no caller by galenwarren · Pull 
Request #304 · apache/flink-statefun 
(github.com)|https://github.com/apache/flink-statefun/pull/304]

> Fix Statefun Golang SDK to return nil from Context.Caller when there is no 
> caller
> -
>
> Key: FLINK-26375
> URL: https://issues.apache.org/jira/browse/FLINK-26375
> Project: Flink
>  Issue Type: Bug
>  Components: Stateful Functions
>Affects Versions: statefun-3.0.0, statefun-3.1.0, statefun-3.2.0, 
> statefun-3.1.1
>Reporter: Galen Warren
>Priority: Minor
>  Labels: pull-request-available
> Fix For: statefun-3.2.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When a stateful function is invoked from an ingress – i.e. when there is no 
> upstream function caller -- Context.Caller() should return nil. Currently it 
> returns a pointer to a zero-value Address. This issue would fix that.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26375) Fix Statefun Golang SDK to return nil from Context.Caller when there is no caller

2022-02-25 Thread Galen Warren (Jira)
Galen Warren created FLINK-26375:


 Summary: Fix Statefun Golang SDK to return nil from Context.Caller 
when there is no caller
 Key: FLINK-26375
 URL: https://issues.apache.org/jira/browse/FLINK-26375
 Project: Flink
  Issue Type: Bug
  Components: Stateful Functions
Affects Versions: statefun-3.1.1, statefun-3.2.0, statefun-3.1.0, 
statefun-3.0.0
Reporter: Galen Warren
 Fix For: statefun-3.2.0


When a stateful function is invoked from an ingress – i.e. when there is no 
upstream function caller -- Context.Caller() should return nil. Currently it 
returns a pointer to a zero-value Address. This issue would fix that.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26340) Add ability in Golang SDK to create new statefun.Context from existing one, but with a new underlying context.Context

2022-02-24 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17497745#comment-17497745
 ] 

Galen Warren commented on FLINK-26340:
--

Just a note for anyone who might arrive here – we changed the implementation in 
the PR to add a top-level DeriveContext function instead of changing the 
statefun.Context interface to add a WithContext method. Otherwise, the approach 
is the same.

> Add ability in Golang SDK to create new statefun.Context from existing one, 
> but with a new underlying context.Context
> -
>
> Key: FLINK-26340
> URL: https://issues.apache.org/jira/browse/FLINK-26340
> Project: Flink
>  Issue Type: Improvement
>  Components: Stateful Functions
>Affects Versions: statefun-3.3.0
>Reporter: Galen Warren
>Assignee: Galen Warren
>Priority: Minor
>  Labels: pull-request-available
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> In the Golang SDK, statefun.Context embeds the context.Context interface and 
> is implemented by the statefunContext struct, which embeds a context.Context. 
> To support common patterns in Golang related to adding values to context, it 
> would be useful to be able to create a derived statefun.Context that is 
> equivalent to the original in terms of statefun functionality but which wraps 
> a different context.Context.
> The proposal is to add a:
> WithContext(ctx context.Context) statefun.Context
> ... method to the statefun.Context interface and implement it on 
> statefunContext. This method would return the derived statefun context.
> This is a breaking change to statefun.Context, but, given its purpose, we do 
> not expect there to be implementations of this interface outside the Golang 
> SDK. 
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26340) Add ability in Golang SDK to create new statefun.Context from existing one, but with a new underlying context.Context

2022-02-23 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17496903#comment-17496903
 ] 

Galen Warren commented on FLINK-26340:
--

PR created: [[FLINK-26340][statefun-golang-sdk] Add ability in Golang SDK to 
create new statefun.Context from existing one, but with a new underlying 
context.Context by galenwarren · Pull Request #303 · apache/flink-statefun 
(github.com)|https://github.com/apache/flink-statefun/pull/303]

> Add ability in Golang SDK to create new statefun.Context from existing one, 
> but with a new underlying context.Context
> -
>
> Key: FLINK-26340
> URL: https://issues.apache.org/jira/browse/FLINK-26340
> Project: Flink
>  Issue Type: Improvement
>  Components: Stateful Functions
>Affects Versions: statefun-3.3.0
>Reporter: Galen Warren
>Priority: Minor
>  Labels: pull-request-available
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> In the Golang SDK, statefun.Context embeds the context.Context interface and 
> is implemented by the statefunContext struct, which embeds a context.Context. 
> To support common patterns in Golang related to adding values to context, it 
> would be useful to be able to create a derived statefun.Context that is 
> equivalent to the original in terms of statefun functionality but which wraps 
> a different context.Context.
> The proposal is to add a:
> WithContext(ctx context.Context) statefun.Context
> ... method to the statefun.Context interface and implement it on 
> statefunContext. This method would return the derived statefun context.
> This is a breaking change to statefun.Context, but, given its purpose, we do 
> not expect there to be implementations of this interface outside the Golang 
> SDK. 
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26340) Add ability in Golang SDK to create new statefun.Context from existing one, but with a new underlying context.Context

2022-02-23 Thread Galen Warren (Jira)
Galen Warren created FLINK-26340:


 Summary: Add ability in Golang SDK to create new statefun.Context 
from existing one, but with a new underlying context.Context
 Key: FLINK-26340
 URL: https://issues.apache.org/jira/browse/FLINK-26340
 Project: Flink
  Issue Type: Improvement
  Components: Stateful Functions
Affects Versions: statefun-3.3.0
Reporter: Galen Warren


In the Golang SDK, statefun.Context embeds the context.Context interface and is 
implemented by the statefunContext struct, which embeds a context.Context. To 
support common patterns in Golang related to adding values to context, it would 
be useful to be able to create a derived statefun.Context that is equivalent to 
the original in terms of statefun functionality but which wraps a different 
context.Context.

The proposal is to add a:

WithContext(ctx context.Context) statefun.Context

... method to the statefun.Context interface and implement it on 
statefunContext. This method would return the derived statefun context.

This is a breaking change to statefun.Context, but, given its purpose, we do 
not expect there to be implementations of this interface outside the Golang 
SDK. 

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25933) Allow configuring different transports in RequestReplyFunctionBuilder

2022-02-09 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17489697#comment-17489697
 ] 

Galen Warren commented on FLINK-25933:
--

PR created: https://github.com/apache/flink-statefun/pull/300.

> Allow configuring different transports in RequestReplyFunctionBuilder
> -
>
> Key: FLINK-25933
> URL: https://issues.apache.org/jira/browse/FLINK-25933
> Project: Flink
>  Issue Type: Improvement
>  Components: Stateful Functions
>Reporter: Igal Shilman
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available
>
> Currently it is not possible to configure the type of the transport used 
> while using the data stream integration.
> It would be useful to do so.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-25873) Using RocksDB state backend causes java.lang.ClassNotFoundException: org.apache.flink.runtime.state.CheckpointStreamFactory$CheckpointStateOutputStream

2022-01-29 Thread Galen Warren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Galen Warren closed FLINK-25873.

Resolution: Not A Problem

> Using RocksDB state backend causes java.lang.ClassNotFoundException: 
> org.apache.flink.runtime.state.CheckpointStreamFactory$CheckpointStateOutputStream
> ---
>
> Key: FLINK-25873
> URL: https://issues.apache.org/jira/browse/FLINK-25873
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.15.0
>Reporter: Galen Warren
>Priority: Major
> Fix For: 1.15.0
>
>
> From the current master branch, attempting to use the RocksDb state backend 
> yields this error when a checkpoint write is attempted:
> {quote}{{Caused by: java.lang.NoClassDefFoundError: 
> org/apache/flink/runtime/state/CheckpointStreamFactory$CheckpointStateOutputStream}}
> {{    at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy.asyncSnapshot(RocksIncrementalSnapshotStrategy.java:180)}}
> {{    at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy.asyncSnapshot(RocksIncrementalSnapshotStrategy.java:83)}}
> {{    at 
> org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:80)}}
> {{    at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:551)}}
> {{    at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:242)}}
> {{    at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:170)}}
> {{    at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:227)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:212)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:647)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:320)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$12(StreamTask.java:1251)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1239)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1196)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)}}
> {{    at 
> 

[jira] [Commented] (FLINK-25873) Using RocksDB state backend causes java.lang.ClassNotFoundException: org.apache.flink.runtime.state.CheckpointStreamFactory$CheckpointStateOutputStream

2022-01-29 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17484241#comment-17484241
 ] 

Galen Warren commented on FLINK-25873:
--

Yes, this was a problem on my end. I hadn't removed the Scala suffix from 
`flink-statebackend-rocksdb` and was pulling in an old/different version.

Sorry for the false alarm. This can be closed.

> Using RocksDB state backend causes java.lang.ClassNotFoundException: 
> org.apache.flink.runtime.state.CheckpointStreamFactory$CheckpointStateOutputStream
> ---
>
> Key: FLINK-25873
> URL: https://issues.apache.org/jira/browse/FLINK-25873
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.15.0
>Reporter: Galen Warren
>Priority: Major
> Fix For: 1.15.0
>
>
> From the current master branch, attempting to use the RocksDb state backend 
> yields this error when a checkpoint write is attempted:
> {quote}{{Caused by: java.lang.NoClassDefFoundError: 
> org/apache/flink/runtime/state/CheckpointStreamFactory$CheckpointStateOutputStream}}
> {{    at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy.asyncSnapshot(RocksIncrementalSnapshotStrategy.java:180)}}
> {{    at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy.asyncSnapshot(RocksIncrementalSnapshotStrategy.java:83)}}
> {{    at 
> org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:80)}}
> {{    at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:551)}}
> {{    at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:242)}}
> {{    at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:170)}}
> {{    at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:227)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:212)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:647)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:320)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$12(StreamTask.java:1251)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1239)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1196)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)}}
> {{    at 
> 

[jira] [Commented] (FLINK-25873) Using RocksDB state backend causes java.lang.ClassNotFoundException: org.apache.flink.runtime.state.CheckpointStreamFactory$CheckpointStateOutputStream

2022-01-29 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17484200#comment-17484200
 ] 

Galen Warren commented on FLINK-25873:
--

Actually, this may be a caching issue on my side. I'll report back after I 
investigate some more.

> Using RocksDB state backend causes java.lang.ClassNotFoundException: 
> org.apache.flink.runtime.state.CheckpointStreamFactory$CheckpointStateOutputStream
> ---
>
> Key: FLINK-25873
> URL: https://issues.apache.org/jira/browse/FLINK-25873
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.15.0
>Reporter: Galen Warren
>Priority: Major
> Fix For: 1.15.0
>
>
> From the current master branch, attempting to use the RocksDb state backend 
> yields this error when a checkpoint write is attempted:
> {quote}{{Caused by: java.lang.NoClassDefFoundError: 
> org/apache/flink/runtime/state/CheckpointStreamFactory$CheckpointStateOutputStream}}
> {{    at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy.asyncSnapshot(RocksIncrementalSnapshotStrategy.java:180)}}
> {{    at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy.asyncSnapshot(RocksIncrementalSnapshotStrategy.java:83)}}
> {{    at 
> org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:80)}}
> {{    at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:551)}}
> {{    at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:242)}}
> {{    at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:170)}}
> {{    at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:227)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:212)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:647)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:320)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$12(StreamTask.java:1251)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1239)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1196)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)}}
> {{    at 
> 

[jira] [Commented] (FLINK-25873) Using RocksDB state backend causes java.lang.ClassNotFoundException: org.apache.flink.runtime.state.CheckpointStreamFactory$CheckpointStateOutputStream

2022-01-28 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17483905#comment-17483905
 ] 

Galen Warren commented on FLINK-25873:
--

Sorry, link to current location of CheckpointStateOutputStream should be 
https://github.com/apache/flink/blob/e2f609c92918d669f2a1c0b69184878d691e3097/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateOutputStream.java#L43.

> Using RocksDB state backend causes java.lang.ClassNotFoundException: 
> org.apache.flink.runtime.state.CheckpointStreamFactory$CheckpointStateOutputStream
> ---
>
> Key: FLINK-25873
> URL: https://issues.apache.org/jira/browse/FLINK-25873
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.15.0
>Reporter: Galen Warren
>Priority: Major
> Fix For: 1.15.0
>
>
> From the current master branch, attempting to use the RocksDb state backend 
> yields this error when a checkpoint write is attempted:
> {quote}{{Caused by: java.lang.NoClassDefFoundError: 
> org/apache/flink/runtime/state/CheckpointStreamFactory$CheckpointStateOutputStream}}
> {{    at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy.asyncSnapshot(RocksIncrementalSnapshotStrategy.java:180)}}
> {{    at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy.asyncSnapshot(RocksIncrementalSnapshotStrategy.java:83)}}
> {{    at 
> org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:80)}}
> {{    at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:551)}}
> {{    at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:242)}}
> {{    at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:170)}}
> {{    at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:227)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:212)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:647)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:320)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$12(StreamTask.java:1251)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1239)}}
> {{    at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1196)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)}}
> {{    at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)}}

[jira] [Created] (FLINK-25873) Using RocksDB state backend causes java.lang.ClassNotFoundException: org.apache.flink.runtime.state.CheckpointStreamFactory$CheckpointStateOutputStream

2022-01-28 Thread Galen Warren (Jira)
Galen Warren created FLINK-25873:


 Summary: Using RocksDB state backend causes 
java.lang.ClassNotFoundException: 
org.apache.flink.runtime.state.CheckpointStreamFactory$CheckpointStateOutputStream
 Key: FLINK-25873
 URL: https://issues.apache.org/jira/browse/FLINK-25873
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.15.0
Reporter: Galen Warren
 Fix For: 1.15.0


>From the current master branch, attempting to use the RocksDb state backend 
>yields this error when a checkpoint write is attempted:
{quote}{{Caused by: java.lang.NoClassDefFoundError: 
org/apache/flink/runtime/state/CheckpointStreamFactory$CheckpointStateOutputStream}}
{{    at 
org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy.asyncSnapshot(RocksIncrementalSnapshotStrategy.java:180)}}
{{    at 
org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy.asyncSnapshot(RocksIncrementalSnapshotStrategy.java:83)}}
{{    at 
org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:80)}}
{{    at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:551)}}
{{    at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:242)}}
{{    at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:170)}}
{{    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)}}
{{    at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:227)}}
{{    at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:212)}}
{{    at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)}}
{{    at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:647)}}
{{    at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:320)}}
{{    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$12(StreamTask.java:1251)}}
{{    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)}}
{{    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1239)}}
{{    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1196)}}
{{    at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)}}
{{    at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)}}
{{    at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)}}
{{    at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493)}}
{{    at 
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)}}
{{    at 
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)}}
{{    at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)}}
{{    at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)}}
{{    at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)}}
{{    at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)}}
{{    at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)}}
{{    at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)}}
{{    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)}}
{{    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:517)}}
{{    at 

[jira] [Commented] (FLINK-25577) Update GCS documentation

2022-01-28 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17483803#comment-17483803
 ] 

Galen Warren commented on FLINK-25577:
--

[~xtsong] Thanks! I've really enjoyed working with you as well and I appreciate 
all your help and guidance. I'll keep a lookout for any feedback during the 
testing period. Enjoy your holiday!

> Update GCS documentation
> 
>
> Key: FLINK-25577
> URL: https://issues.apache.org/jira/browse/FLINK-25577
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / FileSystem, Documentation
>Reporter: David Morávek
>Assignee: Galen Warren
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Update GCS documentation with respect to:
> - flink-shaded-hadoop artifacts and hadoop version 2.8.3 that are no longer 
> supported as of 1.15
> - The recoverable writer new feature
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/gcs/



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25577) Update GCS documentation

2022-01-24 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17481473#comment-17481473
 ] 

Galen Warren commented on FLINK-25577:
--

Here's a second try at a release note:

The `flink-gs-fs-hadoop` FileSystem plugin for Google Cloud Storage (GCS) has 
been introduced. This allows Flink to read data from and write data to GCS via 
paths with the `gs:://' scheme, and it provides similar functionality for GCS 
as, for example, the `flink-s3-fs-hadoop` provides for Amazon S3.

In particular, this plugin supports the `RecoverableWriter` interface, which 
allows it to be used with file sinks.

Under the hood, the `flink-gs-fs-hadoop` uses Google's `gcs-connector` Hadoop 
library for basic read/write operations, and it uses Google's 
`google-cloud-storage` library to implement `RecoverableWriter` functionality.

 

 

 

 

 

> Update GCS documentation
> 
>
> Key: FLINK-25577
> URL: https://issues.apache.org/jira/browse/FLINK-25577
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / FileSystem, Documentation
>Reporter: David Morávek
>Assignee: Galen Warren
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Update GCS documentation with respect to:
> - flink-shaded-hadoop artifacts and hadoop version 2.8.3 that are no longer 
> supported as of 1.15
> - The recoverable writer new feature
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/gcs/



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25790) Support authentication via core-site.xml in GCS FileSystem plugin

2022-01-24 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17481453#comment-17481453
 ] 

Galen Warren commented on FLINK-25790:
--

Created PR: https://github.com/apache/flink/pull/18489

> Support authentication via core-site.xml in GCS FileSystem plugin
> -
>
> Key: FLINK-25790
> URL: https://issues.apache.org/jira/browse/FLINK-25790
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystems
>Affects Versions: 1.15.0
>Reporter: Galen Warren
>Priority: Major
>  Labels: pull-request-available
>
> Add support for authentication via core-site.xml to the new GCS FileSystem 
> connector, recently added via [FLINK-11838] Create RecoverableWriter for GCS 
> - ASF JIRA (apache.org).
> Specifically, make the RecoverableWriter use explicit credentials supplied in 
> core-site.xml in the "google.cloud.auth.service.account.json.keyfile" 
> property. Otherwise, it should use implicit credentials, as it already does.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25790) Support authentication via core-site.xml in GCS FileSystem plugin

2022-01-24 Thread Galen Warren (Jira)
Galen Warren created FLINK-25790:


 Summary: Support authentication via core-site.xml in GCS 
FileSystem plugin
 Key: FLINK-25790
 URL: https://issues.apache.org/jira/browse/FLINK-25790
 Project: Flink
  Issue Type: Improvement
  Components: FileSystems
Affects Versions: 1.15.0
Reporter: Galen Warren


Add support for authentication via core-site.xml to the new GCS FileSystem 
connector, recently added via [FLINK-11838] Create RecoverableWriter for GCS - 
ASF JIRA (apache.org).

Specifically, make the RecoverableWriter use explicit credentials supplied in 
core-site.xml in the "google.cloud.auth.service.account.json.keyfile" property. 
Otherwise, it should use implicit credentials, as it already does.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25772) GCS filesystem fails license checker

2022-01-24 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17481128#comment-17481128
 ] 

Galen Warren commented on FLINK-25772:
--

I believe this is a duplicate of [FLINK-25758] GCS Filesystem implementation 
fails on Java 11 tests due to licensing issues - ASF JIRA (apache.org) which 
we're trying to address via [[FLINK-25758][flink-gs-fs-hadoop] Fix licensing 
issues by galenwarren · Pull Request #18452 · apache/flink 
(github.com)|https://github.com/apache/flink/pull/18452].

> GCS filesystem fails license checker
> 
>
> Key: FLINK-25772
> URL: https://issues.apache.org/jira/browse/FLINK-25772
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.0
>Reporter: Till Rohrmann
>Priority: Blocker
>  Labels: test-stability
>
> FLINK-11838 made the {{LicenseChecker}} fail with the following exception 
> when compiling using Java 11.
> {code}
> 00:30:51,995 ERROR org.apache.flink.tools.ci.licensecheck.JarFileChecker  
>   [] - File '/javax/annotation/security/package.html' in jar 
> '/tmp/flink-validation-deployment/org/apache/flink/flink-gs-fs-hadoop/1.15-SNAPSHOT/flink-gs-fs-hadoop-1.15-20220121.001624-1.jar'
>  contains match with forbidden regex 'gnu ?\R?[\s/#]*general 
> ?\R?[\s/#]*public ?\R?[\s/#]*license'.
> 00:30:51,997 ERROR org.apache.flink.tools.ci.licensecheck.JarFileChecker  
>   [] - File '/javax/annotation/package.html' in jar 
> '/tmp/flink-validation-deployment/org/apache/flink/flink-gs-fs-hadoop/1.15-SNAPSHOT/flink-gs-fs-hadoop-1.15-20220121.001624-1.jar'
>  contains match with forbidden regex 'gnu ?\R?[\s/#]*general 
> ?\R?[\s/#]*public ?\R?[\s/#]*license'.
> 00:32:17,194 WARN  org.apache.flink.tools.ci.licensecheck.LicenseChecker  
>   [] - Found a total of 3 severe license issues
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=29841=logs=946871de-358d-5815-3994-8175615bc253=e0240c62-4570-5d1c-51af-dd63d2093da1



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25577) Update GCS documentation

2022-01-23 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17480732#comment-17480732
 ] 

Galen Warren commented on FLINK-25577:
--

[~MartijnVisser] I'm trying to add the release note, but I can't figure out how 
to set (or view) that field on this ticket. I'm wondering if I have the proper 
permissions? Sorry if I'm not seeing something obvious.

I was thinking this for the content of the release note:
{quote}Add the flink-gs-fs-hadoop FileSystem plugin for Google Cloud Storage, 
with RecoverableWriter support.
{quote}
 

> Update GCS documentation
> 
>
> Key: FLINK-25577
> URL: https://issues.apache.org/jira/browse/FLINK-25577
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / FileSystem, Documentation
>Reporter: David Morávek
>Assignee: Galen Warren
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Update GCS documentation with respect to:
> - flink-shaded-hadoop artifacts and hadoop version 2.8.3 that are no longer 
> supported as of 1.15
> - The recoverable writer new feature
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/gcs/



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25758) GCS Filesystem implementation fails on Java 11 tests due to licensing issues

2022-01-23 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17480728#comment-17480728
 ] 

Galen Warren commented on FLINK-25758:
--

PR created: [https://github.com/apache/flink/pull/18452]

 

> GCS Filesystem implementation fails on Java 11 tests due to licensing issues
> 
>
> Key: FLINK-25758
> URL: https://issues.apache.org/jira/browse/FLINK-25758
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.0
>Reporter: Martijn Visser
>Assignee: Galen Warren
>Priority: Blocker
>  Labels: pull-request-available
>
> {code}
> 00:33:45,410 DEBUG org.apache.flink.tools.ci.licensecheck.NoticeFileChecker   
>   [] - Dependency io.netty:netty-common:4.1.51.Final is mentioned in NOTICE 
> file /__w/2/s/flink-python/src/main/resources/META-INF/NOTICE, but was not 
> mentioned by the build output as a bundled dependency
> 00:33:45,411 ERROR org.apache.flink.tools.ci.licensecheck.NoticeFileChecker   
>   [] - Could not find dependency javax.annotation:javax.annotation-api:1.3.2 
> in NOTICE file 
> /__w/2/s/flink-filesystems/flink-gs-fs-hadoop/src/main/resources/META-INF/NOTICE
> 00:33:45,536 INFO  org.apache.flink.tools.ci.licensecheck.JarFileChecker  
>   [] - Checking directory /tmp/flink-validation-deployment with a total of 
> 197 jar files.
> 00:34:18,554 ERROR org.apache.flink.tools.ci.licensecheck.JarFileChecker  
>   [] - File '/javax/annotation/security/package.html' in jar 
> '/tmp/flink-validation-deployment/org/apache/flink/flink-gs-fs-hadoop/1.15-SNAPSHOT/flink-gs-fs-hadoop-1.15-20220122.001944-1.jar'
>  contains match with forbidden regex 'gnu ?\R?[\s/#]*general 
> ?\R?[\s/#]*public ?\R?[\s/#]*license'.
> 00:34:18,555 ERROR org.apache.flink.tools.ci.licensecheck.JarFileChecker  
>   [] - File '/javax/annotation/package.html' in jar 
> '/tmp/flink-validation-deployment/org/apache/flink/flink-gs-fs-hadoop/1.15-SNAPSHOT/flink-gs-fs-hadoop-1.15-20220122.001944-1.jar'
>  contains match with forbidden regex 'gnu ?\R?[\s/#]*general 
> ?\R?[\s/#]*public ?\R?[\s/#]*license'.
> 00:35:46,612 WARN  org.apache.flink.tools.ci.licensecheck.LicenseChecker  
>   [] - Found a total of 3 severe license issues
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=29932=logs=946871de-358d-5815-3994-8175615bc253=e0240c62-4570-5d1c-51af-dd63d2093da1



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25758) GCS Filesystem implementation fails on Java 11 tests due to licensing issues

2022-01-23 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17480685#comment-17480685
 ] 

Galen Warren commented on FLINK-25758:
--

[~MartijnVisser] PR created: https://github.com/apache/flink/pull/18452

> GCS Filesystem implementation fails on Java 11 tests due to licensing issues
> 
>
> Key: FLINK-25758
> URL: https://issues.apache.org/jira/browse/FLINK-25758
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.0
>Reporter: Martijn Visser
>Assignee: Galen Warren
>Priority: Blocker
>  Labels: pull-request-available
>
> {code}
> 00:33:45,410 DEBUG org.apache.flink.tools.ci.licensecheck.NoticeFileChecker   
>   [] - Dependency io.netty:netty-common:4.1.51.Final is mentioned in NOTICE 
> file /__w/2/s/flink-python/src/main/resources/META-INF/NOTICE, but was not 
> mentioned by the build output as a bundled dependency
> 00:33:45,411 ERROR org.apache.flink.tools.ci.licensecheck.NoticeFileChecker   
>   [] - Could not find dependency javax.annotation:javax.annotation-api:1.3.2 
> in NOTICE file 
> /__w/2/s/flink-filesystems/flink-gs-fs-hadoop/src/main/resources/META-INF/NOTICE
> 00:33:45,536 INFO  org.apache.flink.tools.ci.licensecheck.JarFileChecker  
>   [] - Checking directory /tmp/flink-validation-deployment with a total of 
> 197 jar files.
> 00:34:18,554 ERROR org.apache.flink.tools.ci.licensecheck.JarFileChecker  
>   [] - File '/javax/annotation/security/package.html' in jar 
> '/tmp/flink-validation-deployment/org/apache/flink/flink-gs-fs-hadoop/1.15-SNAPSHOT/flink-gs-fs-hadoop-1.15-20220122.001944-1.jar'
>  contains match with forbidden regex 'gnu ?\R?[\s/#]*general 
> ?\R?[\s/#]*public ?\R?[\s/#]*license'.
> 00:34:18,555 ERROR org.apache.flink.tools.ci.licensecheck.JarFileChecker  
>   [] - File '/javax/annotation/package.html' in jar 
> '/tmp/flink-validation-deployment/org/apache/flink/flink-gs-fs-hadoop/1.15-SNAPSHOT/flink-gs-fs-hadoop-1.15-20220122.001944-1.jar'
>  contains match with forbidden regex 'gnu ?\R?[\s/#]*general 
> ?\R?[\s/#]*public ?\R?[\s/#]*license'.
> 00:35:46,612 WARN  org.apache.flink.tools.ci.licensecheck.LicenseChecker  
>   [] - Found a total of 3 severe license issues
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=29932=logs=946871de-358d-5815-3994-8175615bc253=e0240c62-4570-5d1c-51af-dd63d2093da1



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25758) GCS Filesystem implementation fails on Java 11 tests due to licensing issues

2022-01-22 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17480471#comment-17480471
 ] 

Galen Warren commented on FLINK-25758:
--

Sure, I'll take a look. Is there a way to run the license check locally?

> GCS Filesystem implementation fails on Java 11 tests due to licensing issues
> 
>
> Key: FLINK-25758
> URL: https://issues.apache.org/jira/browse/FLINK-25758
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.0
>Reporter: Martijn Visser
>Assignee: Galen Warren
>Priority: Blocker
>
> {code}
> 00:33:45,410 DEBUG org.apache.flink.tools.ci.licensecheck.NoticeFileChecker   
>   [] - Dependency io.netty:netty-common:4.1.51.Final is mentioned in NOTICE 
> file /__w/2/s/flink-python/src/main/resources/META-INF/NOTICE, but was not 
> mentioned by the build output as a bundled dependency
> 00:33:45,411 ERROR org.apache.flink.tools.ci.licensecheck.NoticeFileChecker   
>   [] - Could not find dependency javax.annotation:javax.annotation-api:1.3.2 
> in NOTICE file 
> /__w/2/s/flink-filesystems/flink-gs-fs-hadoop/src/main/resources/META-INF/NOTICE
> 00:33:45,536 INFO  org.apache.flink.tools.ci.licensecheck.JarFileChecker  
>   [] - Checking directory /tmp/flink-validation-deployment with a total of 
> 197 jar files.
> 00:34:18,554 ERROR org.apache.flink.tools.ci.licensecheck.JarFileChecker  
>   [] - File '/javax/annotation/security/package.html' in jar 
> '/tmp/flink-validation-deployment/org/apache/flink/flink-gs-fs-hadoop/1.15-SNAPSHOT/flink-gs-fs-hadoop-1.15-20220122.001944-1.jar'
>  contains match with forbidden regex 'gnu ?\R?[\s/#]*general 
> ?\R?[\s/#]*public ?\R?[\s/#]*license'.
> 00:34:18,555 ERROR org.apache.flink.tools.ci.licensecheck.JarFileChecker  
>   [] - File '/javax/annotation/package.html' in jar 
> '/tmp/flink-validation-deployment/org/apache/flink/flink-gs-fs-hadoop/1.15-SNAPSHOT/flink-gs-fs-hadoop-1.15-20220122.001944-1.jar'
>  contains match with forbidden regex 'gnu ?\R?[\s/#]*general 
> ?\R?[\s/#]*public ?\R?[\s/#]*license'.
> 00:35:46,612 WARN  org.apache.flink.tools.ci.licensecheck.LicenseChecker  
>   [] - Found a total of 3 severe license issues
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=29932=logs=946871de-358d-5815-3994-8175615bc253=e0240c62-4570-5d1c-51af-dd63d2093da1



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25577) Update GCS documentation

2022-01-20 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17479647#comment-17479647
 ] 

Galen Warren commented on FLINK-25577:
--

[~MartijnVisser] [~dmvk] 

I've created a PR for this:  [https://github.com/apache/flink/pull/18430]

There is still a bit more work to do on the docs, but I wanted to get the PR 
started. Also, I have a question in the PR for someone who knows how the docs 
work.

 

> Update GCS documentation
> 
>
> Key: FLINK-25577
> URL: https://issues.apache.org/jira/browse/FLINK-25577
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / FileSystem, Documentation
>Reporter: David Morávek
>Assignee: Galen Warren
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Update GCS documentation with respect to:
> - flink-shaded-hadoop artifacts and hadoop version 2.8.3 that are no longer 
> supported as of 1.15
> - The recoverable writer new feature
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/gcs/



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2022-01-20 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17479454#comment-17479454
 ] 

Galen Warren commented on FLINK-11838:
--

[~MartijnVisser] I'd say let's handle documentation and release notes on the 
other one, https://issues.apache.org/jira/browse/FLINK-25577, since this one is 
now closed.

 

Sorry if this is a dumb question, but what is involved in adding this to the 
release notes?

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.15.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25577) Update GCS documentation not to use no longer supported flink-shaded-hadoop artifacts

2022-01-07 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17470705#comment-17470705
 ] 

Galen Warren commented on FLINK-25577:
--

Why don't we leave it open, just so that it doesn't get lost in the event 
something goes horribly wrong with the PR that is in progress. But I think 
that's unlikely. Once we complete that PR, I can come back here and update the 
status and then it could be closed. What do you think?

> Update GCS documentation not to use no longer supported flink-shaded-hadoop 
> artifacts
> -
>
> Key: FLINK-25577
> URL: https://issues.apache.org/jira/browse/FLINK-25577
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / FileSystem, Documentation
>Reporter: David Morávek
>Priority: Blocker
> Fix For: 1.15.0
>
>
> The GCS documentation refers to no longer supported artifacts. Also 2.8.3 
> version of Hadoop will be no longer supported as of 1.15.
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/gcs/



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25577) Update GCS documentation not to use no longer supported flink-shaded-hadoop artifacts

2022-01-07 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17470701#comment-17470701
 ] 

Galen Warren commented on FLINK-25577:
--

I just wanted to mention that it's likely that a GCS FileSystem plugin will 
land in the next release, via [FLINK-11838] Create RecoverableWriter for GCS - 
ASF JIRA (apache.org).

This would be similar to the S3 plugins that are documented here: [Amazon S3 | 
Apache 
Flink|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/].

There will presumably be similar documentation for the GCS plugin as part of 
this effort.

The PR in progress is here: [[FLINK-11838][flink-gs-fs-hadoop] Create Google 
Storage file system with recoverable writer support by galenwarren · Pull 
Request #15599 · apache/flink 
(github.com)|https://github.com/apache/flink/pull/15599].

> Update GCS documentation not to use no longer supported flink-shaded-hadoop 
> artifacts
> -
>
> Key: FLINK-25577
> URL: https://issues.apache.org/jira/browse/FLINK-25577
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / FileSystem, Documentation
>Reporter: David Morávek
>Priority: Blocker
> Fix For: 1.15.0
>
>
> The GCS documentation refers to no longer supported artifacts. Also 2.8.3 
> version of Hadoop will be no longer supported as of 1.15.
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/gcs/



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25197) Using Statefun RequestReplyFunctionBuilder fails with Java 8 date/time type `java.time.Duration` not supported by default: add Module "org.apache.flink.shaded.jackson2

2021-12-26 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17465402#comment-17465402
 ] 

Galen Warren commented on FLINK-25197:
--

PR created: https://github.com/apache/flink-statefun/pull/282

On Sun, Dec 26, 2021 at 11:43 AM ASF GitHub Bot (Jira) 



> Using Statefun RequestReplyFunctionBuilder fails with Java 8 date/time type 
> `java.time.Duration` not supported by default: add Module 
> "org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310"
>  to enable handling 
> ---
>
> Key: FLINK-25197
> URL: https://issues.apache.org/jira/browse/FLINK-25197
> Project: Flink
>  Issue Type: Bug
>  Components: Stateful Functions
>Affects Versions: statefun-3.1.0
>Reporter: Galen Warren
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available
> Fix For: statefun-3.1.0
>
>
> When using RequestReplyFunctionBuilder to build a stateful functions job, the 
> job fails at runtime with:
> Java 8 date/time type `java.time.Duration` not supported by default: add 
> Module 
> "org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310"
>  to enable handling 
> It appears this is because, in 
> [RequestReplyFunctionBuilder::transportClientPropertiesAsObjectNode|https://github.com/apache/flink-statefun/blob/b4ba9547b8f0105a28544fd28a5e0433666e9023/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java#L127],
>  a default instance of ObjectMapper is used to serialize the client 
> properties, which now include a java.time.Duration. There is a 
> [StateFunObjectMapper|https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java]
>  class in the project that has customized serde support, but it is not used 
> here.
> The fix seems to be to:
>  * Use an instance of StateFunObjectMapper to serialize the client properties 
> in RequestReplyFunctionBuilder
>  * Modify StateFunObjectMapper to both serialize and deserialize instances of 
> java.time.Duration (currently, only deserialization is supported)
> I've made these changes locally and it seems to fix the problem. Would you be 
> interested in a PR? Thanks.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25197) Using Statefun RequestReplyFunctionBuilder fails with Java 8 date/time type `java.time.Duration` not supported by default: add Module "org.apache.flink.shaded.jackson2

2021-12-26 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17465400#comment-17465400
 ] 

Galen Warren commented on FLINK-25197:
--

PR created: https://github.com/apache/flink-statefun/pull/282

> Using Statefun RequestReplyFunctionBuilder fails with Java 8 date/time type 
> `java.time.Duration` not supported by default: add Module 
> "org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310"
>  to enable handling 
> ---
>
> Key: FLINK-25197
> URL: https://issues.apache.org/jira/browse/FLINK-25197
> Project: Flink
>  Issue Type: Bug
>  Components: Stateful Functions
>Affects Versions: statefun-3.1.0
>Reporter: Galen Warren
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available
> Fix For: statefun-3.1.0
>
>
> When using RequestReplyFunctionBuilder to build a stateful functions job, the 
> job fails at runtime with:
> Java 8 date/time type `java.time.Duration` not supported by default: add 
> Module 
> "org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310"
>  to enable handling 
> It appears this is because, in 
> [RequestReplyFunctionBuilder::transportClientPropertiesAsObjectNode|https://github.com/apache/flink-statefun/blob/b4ba9547b8f0105a28544fd28a5e0433666e9023/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java#L127],
>  a default instance of ObjectMapper is used to serialize the client 
> properties, which now include a java.time.Duration. There is a 
> [StateFunObjectMapper|https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java]
>  class in the project that has customized serde support, but it is not used 
> here.
> The fix seems to be to:
>  * Use an instance of StateFunObjectMapper to serialize the client properties 
> in RequestReplyFunctionBuilder
>  * Modify StateFunObjectMapper to both serialize and deserialize instances of 
> java.time.Duration (currently, only deserialization is supported)
> I've made these changes locally and it seems to fix the problem. Would you be 
> interested in a PR? Thanks.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25197) Using Statefun RequestReplyFunctionBuilder fails with Java 8 date/time type `java.time.Duration` not supported by default: add Module "org.apache.flink.shaded.jackson2.c

2021-12-06 Thread Galen Warren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Galen Warren updated FLINK-25197:
-
Description: 
When using RequestReplyFunctionBuilder to build a stateful functions job, the 
job fails at runtime with:

Java 8 date/time type `java.time.Duration` not supported by default: add Module 
"org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310"
 to enable handling 

It appears this is because, in 
[RequestReplyFunctionBuilder::transportClientPropertiesAsObjectNode|https://github.com/apache/flink-statefun/blob/b4ba9547b8f0105a28544fd28a5e0433666e9023/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java#L127],
 a default instance of ObjectMapper is used to serialize the client properties, 
which now include a java.time.Duration. There is a 
[StateFunObjectMapper|https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java]
 class in the project that has customized serde support, but it is not used 
here.

The fix seems to be to:
 * Use an instance of StateFunObjectMapper to serialize the client properties 
in RequestReplyFunctionBuilder
 * Modify StateFunObjectMapper to both serialize and deserialize instances of 
java.time.Duration (currently, only deserialization is supported)

I've made these changes locally and it seems to fix the problem. Would you be 
interested in a PR? Thanks.

 

  was:
When using RequestReplyFunctionBuilder to build a stateful functions job, the 
job fails at runtime with:

Java 8 date/time type `java.time.Duration` not supported by default: add Module 
"org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310"
 to enable handling 

It appears this is because, in 
[RequestReplyFunctionBuilder::transportClientPropertiesAsObjectNode|https://github.com/apache/flink-statefun/blob/b4ba9547b8f0105a28544fd28a5e0433666e9023/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java#L127],
 a default instance of ObjectMapper is used to serialize the client properties, 
which now include a java.time.Duration. There is a 
[StateFunObjectMapper|https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java]
 class in the project that has customized serde support, but it is not used 
here.

The fix seems to be to:
 * Use an instance of StateFunObjectMapper to serialize the client properties 
in RequestReplyFunctionBuilder
 * Modify StateFunObjecdtMapper to both serialize and deserialize instances of 
java.time.Duration (currently, only deserialization is supported)

I've made these changes locally and it seems to fix the problem. Would you be 
interested in a PR? Thanks.

 


> Using Statefun RequestReplyFunctionBuilder fails with Java 8 date/time type 
> `java.time.Duration` not supported by default: add Module 
> "org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310"
>  to enable handling 
> ---
>
> Key: FLINK-25197
> URL: https://issues.apache.org/jira/browse/FLINK-25197
> Project: Flink
>  Issue Type: Bug
>  Components: Stateful Functions
>Affects Versions: statefun-3.1.0
>Reporter: Galen Warren
>Priority: Major
> Fix For: statefun-3.1.0
>
>
> When using RequestReplyFunctionBuilder to build a stateful functions job, the 
> job fails at runtime with:
> Java 8 date/time type `java.time.Duration` not supported by default: add 
> Module 
> "org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310"
>  to enable handling 
> It appears this is because, in 
> [RequestReplyFunctionBuilder::transportClientPropertiesAsObjectNode|https://github.com/apache/flink-statefun/blob/b4ba9547b8f0105a28544fd28a5e0433666e9023/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java#L127],
>  a default instance of ObjectMapper is used to serialize the client 
> properties, which now include a java.time.Duration. There is a 
> [StateFunObjectMapper|https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java]
>  class in the project that has customized serde support, but it is not used 
> 

[jira] [Created] (FLINK-25197) Using Statefun RequestReplyFunctionBuilder fails with Java 8 date/time type `java.time.Duration` not supported by default: add Module "org.apache.flink.shaded.jackson2.c

2021-12-06 Thread Galen Warren (Jira)
Galen Warren created FLINK-25197:


 Summary: Using Statefun RequestReplyFunctionBuilder fails with 
Java 8 date/time type `java.time.Duration` not supported by default: add Module 
"org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310"
 to enable handling 
 Key: FLINK-25197
 URL: https://issues.apache.org/jira/browse/FLINK-25197
 Project: Flink
  Issue Type: Bug
  Components: Stateful Functions
Affects Versions: statefun-3.1.0
Reporter: Galen Warren
 Fix For: statefun-3.1.0


When using RequestReplyFunctionBuilder to build a stateful functions job, the 
job fails at runtime with:

Java 8 date/time type `java.time.Duration` not supported by default: add Module 
"org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310"
 to enable handling 

It appears this is because, in 
[RequestReplyFunctionBuilder::transportClientPropertiesAsObjectNode|https://github.com/apache/flink-statefun/blob/b4ba9547b8f0105a28544fd28a5e0433666e9023/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java#L127],
 a default instance of ObjectMapper is used to serialize the client properties, 
which now include a java.time.Duration. There is a 
[StateFunObjectMapper|https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java]
 class in the project that has customized serde support, but it is not used 
here.

The fix seems to be to:
 * Use an instance of StateFunObjectMapper to serialize the client properties 
in RequestReplyFunctionBuilder
 * Modify StateFunObjecdtMapper to both serialize and deserialize instances of 
java.time.Duration (currently, only deserialization is supported)

I've made these changes locally and it seems to fix the problem. Would you be 
interested in a PR? Thanks.

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-08-12 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17398108#comment-17398108
 ] 

Galen Warren commented on FLINK-11838:
--

I know this has taken a long time. I'll have the code in before the upcoming 
1.14 freeze.

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, stale-assigned, usability
> Fix For: 1.14.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22757) Update GCS documentation

2021-05-23 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17350139#comment-17350139
 ] 

Galen Warren commented on FLINK-22757:
--

In case it's relevant here, I'm working on a 
[PR|https://github.com/apache/flink/pull/15599] related to GCS support in 
Flink. There will be some new documentation as part of this.

> Update GCS documentation
> 
>
> Key: FLINK-22757
> URL: https://issues.apache.org/jira/browse/FLINK-22757
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Ankush Khanna
>Priority: Minor
>
> Currently, GCS filesystem documentation points to 
> [https://cloud.google.com/dataproc.] This does cover the correct way to 
> connect to GCS. 
> Following from this [blog 
> post|https://www.ververica.com/blog/getting-started-with-da-platform-on-google-kubernetes-engine]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19481) Add support for a flink native GCS FileSystem

2021-05-10 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17342087#comment-17342087
 ] 

Galen Warren commented on FLINK-19481:
--

I wanted to check in here. Should I wait until this question is resolved before 
proceeding with the PR? 

Personally, my preference would be to see Flink HadoopFileSystem + 
GoogleHadoopFileSystem as at least _an_ option for the file system 
implementation, just because those components seem to be well established. I'm 
not opposed to an alternate implementation, though, i.e. as has been done with 
S3. If that's the path we're going down, it might mean some changes for the 
code in the PR I'm working on, hence the question.

 

> Add support for a flink native GCS FileSystem
> -
>
> Key: FLINK-19481
> URL: https://issues.apache.org/jira/browse/FLINK-19481
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem, FileSystems
>Affects Versions: 1.12.0
>Reporter: Ben Augarten
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Currently, GCS is supported but only by using the hadoop connector[1]
>  
> The objective of this improvement is to add support for checkpointing to 
> Google Cloud Storage with the Flink File System,
>  
> This would allow the `gs://` scheme to be used for savepointing and 
> checkpointing. Long term, it would be nice if we could use the GCS FileSystem 
> as a source and sink in flink jobs as well. 
>  
> Long term, I hope that implementing a flink native GCS FileSystem will 
> simplify usage of GCS because the hadoop FileSystem ends up bringing in many 
> unshaded dependencies.
>  
> [1] 
> [https://github.com/GoogleCloudDataproc/hadoop-connectors|https://github.com/GoogleCloudDataproc/hadoop-connectors)]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19481) Add support for a flink native GCS FileSystem

2021-05-03 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17338418#comment-17338418
 ] 

Galen Warren commented on FLINK-19481:
--

Hi all, I'm the author of the other 
[PR|https://github.com/apache/flink/pull/15599] that relates to Google Cloud 
Storage. [~xintongsong] has been working with me on this.

The main goal of my PR is to add support for the RecoverableWriter interface, 
so that one can write to GCS via a StreamingFileSink. The file system support 
goes through the Hadoop stack, as noted above, using Google's [cloud storage 
connector|https://cloud.google.com/dataproc/docs/concepts/connectors/cloud-storage].

I have not personally had problems using the GCS connector and the Hadoop stack 
– it seems to write check/savepoints properly. I also use it to write job 
manager HA data to GCS, which seems to work fine.

However, if we do want to support a native implementation in addition to the 
Hadoop-based one, we could approach it similarly to what has been done for S3, 
i.e. have a shared base project (flink-gs-fs-base?) and then projects for each 
of the implementations ( flink-gs-fs-hadoop and flink-gs-fs-native?). The 
recoverable-writer code could go into the shared project so that both of the 
implementations could use it (assuming that the native implementation doesn't 
already have a recoverable-writer implementation).

I'll defer to the Flink experts on whether that's a worthwhile effort or not. 
At this point, from my perspective, it wouldn't be that much work to rework the 
project structure to support this.

 

> Add support for a flink native GCS FileSystem
> -
>
> Key: FLINK-19481
> URL: https://issues.apache.org/jira/browse/FLINK-19481
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem, FileSystems
>Affects Versions: 1.12.0
>Reporter: Ben Augarten
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Currently, GCS is supported but only by using the hadoop connector[1]
>  
> The objective of this improvement is to add support for checkpointing to 
> Google Cloud Storage with the Flink File System,
>  
> This would allow the `gs://` scheme to be used for savepointing and 
> checkpointing. Long term, it would be nice if we could use the GCS FileSystem 
> as a source and sink in flink jobs as well. 
>  
> Long term, I hope that implementing a flink native GCS FileSystem will 
> simplify usage of GCS because the hadoop FileSystem ends up bringing in many 
> unshaded dependencies.
>  
> [1] 
> [https://github.com/GoogleCloudDataproc/hadoop-connectors|https://github.com/GoogleCloudDataproc/hadoop-connectors)]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-04-21 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17326990#comment-17326990
 ] 

Galen Warren commented on FLINK-11838:
--

PR is still active.

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, stale-assigned, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-04-13 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17320363#comment-17320363
 ] 

Galen Warren commented on FLINK-11838:
--

[~xintongsong],

PR created: [https://github.com/apache/flink/pull/15599]

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-04-07 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17316376#comment-17316376
 ] 

Galen Warren commented on FLINK-11838:
--

Working on the new PR now ... coming soon.

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-11838) Create RecoverableWriter for GCS

2021-04-02 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17314079#comment-17314079
 ] 

Galen Warren edited comment on FLINK-11838 at 4/2/21, 8:54 PM:
---

[~xintongsong] – actually, is that the right thing to do, start a new PR? I'm 
thinking that might be the cleanest way to proceed, since we never really used 
the old one that I created prematurely, and the info in the old one is out of 
date, but I'll follow your guidance here on what to do.

Old PR is [here|https://github.com/apache/flink/pull/14875]


was (Author: galenwarren):
[~xintongsong] – actually, is that the right thing to do, start a new PR? I'm 
thinking that might be the cleanest way to proceed, since we never really used 
the old one that I created prematurely, and the info in the old one is out of 
date, but I'll follow your guidance here on what to do.

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-04-02 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17314079#comment-17314079
 ] 

Galen Warren commented on FLINK-11838:
--

[~xintongsong] – actually, is that the right thing to do, start a new PR? I'm 
thinking that might be the cleanest way to proceed, since we never really used 
the old one that I created prematurely, and the info in the old one is out of 
date, but I'll follow your guidance here on what to do.

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-04-02 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17314067#comment-17314067
 ] 

Galen Warren commented on FLINK-11838:
--

Hi [~xintongsong] – sorry for the long delay. I plan to create a new PR and 
upload some code this weekend, my other work has let up a bit and I have some 
time to look at this. I'll post the link here when it's ready.

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-03-17 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17303537#comment-17303537
 ] 

Galen Warren commented on FLINK-11838:
--

Not that I'm aware of. Writing checkpoints/savepoints and HA data to GCS 
buckets works fine as-is, but to use StreamingFileSink, a RecoverableWriter 
implementation is required. That exists now for S3 but not GCS.

I have a local GCS RecoverableWriter implementation I'm using and which seems 
to be working, and I'm hoping to push the first part of the code for review by 
[~xintongsong] this weekend.

 

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-03-12 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17300332#comment-17300332
 ] 

Galen Warren commented on FLINK-11838:
--

[~BerkayOzturk] No, it's not in a working state. Since that PR, [~xintongsong] 
and I refined the proposed implementation significantly. I'm hoping to get back 
to this very soon, I've had some work stuff come up that is taking all my time. 
Sorry about the delay!

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-03-04 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17295384#comment-17295384
 ] 

Galen Warren commented on FLINK-11838:
--

I just wanted to add an update here so this doesn't go stale – I should be able 
to push the first piece of the code in a PR in about a week, once this work 
project I'm on dies down a bit. So this is still coming, sorry for the delay.

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-02-22 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17288419#comment-17288419
 ] 

Galen Warren commented on FLINK-11838:
--

Hey, I just wanted to check in here. Sorry for the delay, some other work stuff 
has come up and I haven't been able to spend any time here. But I will still 
complete this, will hopefully get back to it toward the end of this week.

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-02-15 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17284896#comment-17284896
 ] 

Galen Warren commented on FLINK-11838:
--

Ah, OK – I see. It will take me a couple of days to get things organized, I'll 
post back here when I have something for you to look at. Thanks.

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-02-15 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17284751#comment-17284751
 ] 

Galen Warren commented on FLINK-11838:
--

That's fine on "compose on persist", I'll leave it out for now and supply it in 
a separate commit if we decide to go there.

This reminds me of a question I meant to ask, though. When we first started, 
you had some concern about the size of the original PR and suggested it be 
broken into parts. The new PR will be roughly the same size – if that's too 
big, any suggestion on how to break it into pieces? Should the unit tests be a 
separate commit?

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-02-14 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17284476#comment-17284476
 ] 

Galen Warren commented on FLINK-11838:
--

{quote}Maybe for the first step, it's good enough to simply do all the 
composing and deleting at the end. We can try to optimize it later if we indeed 
see a performance problem in composing and deleting the temporary blobs.
{quote}
I'm fine to go either way here. I've already put something together locally 
that allows for composing both at persist and commit, but it would be simple to 
revert to just doing it at commit. Maybe you can take a look when we get to the 
code phase to see what you think? If it's not obvious which is better, I 
suppose we could also control that – "compose on persist" – via a Flink option.

Are you comfortable with the approach now? If so, I'll work on getting the code 
together in order to update the PR.

Thanks!

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-11838) Create RecoverableWriter for GCS

2021-02-12 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17283950#comment-17283950
 ] 

Galen Warren edited comment on FLINK-11838 at 2/12/21, 9:01 PM:


One more thought/question. While we can potentially compose temp blobs along 
the way, to avoid having to compose them all at commit time, it seems to me 
that we can't safely delete any of the temporary blobs along way, because it's 
possible that we might restore to a checkpoint prior to some or all of the 
incremental compose operations having occurred. In that case, we'd have to 
repeat the compose operations, which means the underlying temp blobs would need 
to be there.

If that's right, then we'd necessarily have to wait to the end to delete the 
temp blobs. I was wondering, would it be allowable to do any of those delete 
operations in parallel? The coding 
[guidelines|https://flink.apache.org/contributing/code-style-and-quality-common.html#concurrency-and-threading]
 would seem to discourage this, but they don't outright prohibit it, so I 
thought I'd ask first.

EDIT: Nevermind regarding the parallelism question; I didn't notice that the 
GCS api provides a method to delete several blobs in a single call, I think 
that will suffice. 


was (Author: galenwarren):
One more thought/question. While we can potentially compose temp blobs along 
the way, to avoid having to compose them all at commit time, it seems to me 
that we can't safely delete any of the temporary blobs along way, because it's 
possible that we might restore to a checkpoint prior to some or all of the 
incremental compose operations having occurred. In that case, we'd have to 
repeat the compose operations, which means the underlying temp blobs would need 
to be there.

If that's right, then we'd necessarily have to wait to the end to delete the 
temp blobs. I was wondering, would it be allowable to do any of those delete 
operations in parallel? The coding 
[guidelines|https://flink.apache.org/contributing/code-style-and-quality-common.html#concurrency-and-threading]
 would seem to discourage this, but they don't outright prohibit it, so I 
thought I'd ask first.

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-02-12 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17283950#comment-17283950
 ] 

Galen Warren commented on FLINK-11838:
--

One more thought/question. While we can potentially compose temp blobs along 
the way, to avoid having to compose them all at commit time, it seems to me 
that we can't safely delete any of the temporary blobs along way, because it's 
possible that we might restore to a checkpoint prior to some or all of the 
incremental compose operations having occurred. In that case, we'd have to 
repeat the compose operations, which means the underlying temp blobs would need 
to be there.

If that's right, then we'd necessarily have to wait to the end to delete the 
temp blobs. I was wondering, would it be allowable to do any of those delete 
operations in parallel? The coding 
[guidelines|https://flink.apache.org/contributing/code-style-and-quality-common.html#concurrency-and-threading]
 would seem to discourage this, but they don't outright prohibit it, so I 
thought I'd ask first.

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-02-12 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17283747#comment-17283747
 ] 

Galen Warren commented on FLINK-11838:
--

Hi [~xintongsong], sorry for the delay here, I had some other, unrelated work I 
had to focus on.

I like your idea of using {{WriteChannel}} for the uploads, but to close each 
one when {{RecoverableFsDataOutputStream.persist}} is called, so that we're not 
relying on {{WriteChannel}} for recoverability. {{WriteChannel}} allows one to 
control how frequently it flushes data via 
[setChunkSize|https://googleapis.dev/java/google-cloud-clients/0.90.0-alpha/com/google/cloud/WriteChannel.html#setChunkSize-int-],
 perhaps we expose this chunk size as a Flink option to give the user some 
control over the process, i.e. how much memory is used for buffering? It could 
be optional, not setting it would mean to use the Google default.

Yes, it would be straightforward to compose blobs at any point in the process, 
i.e. on commit and/or at {{persist}} calls along the way. Maybe we compose them 
on commit (of course) and also whenever {{persist}} is called when there are at 
least 32 temp blobs to be composed? That way, we spread the compose work out 
over time but also call {{compose}} as few times as possible, composing as many 
blobs as possible in each call, which seems like it would be the most efficient.

Shall we go with this approach?

 

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-02-09 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17281769#comment-17281769
 ] 

Galen Warren commented on FLINK-11838:
--

Hi [~xintongsong] – a couple more thoughts here.

I think I understand why the code in 
[BlobWriteChannel|https://github.com/googleapis/java-storage/blob/master/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java]
 and 
[BaseWriteChannel|https://github.com/googleapis/java-core/blob/master/google-cloud-core/src/main/java/com/google/cloud/BaseWriteChannel.java]
 is more complicated than I expected; there is a 
[requirement|https://cloud.google.com/storage/docs/performing-resumable-uploads]
 in resumable uploads that all the uploaded chunks (except the last one) be a 
multiple of 256 Kib, and so the writers have to buffer data in order to meet 
that requirement. The RestorableState then, in general, contains some amount of 
buffered data, i.e. data that has been written but that has not yet been sent 
as part of an upload chunk.

Also, I came across a bit of a disturbing 
[thread|https://issuetracker.google.com/issues/137168102] from last year, where 
a GCS bug was being discussed that, essentially, caused in-progress resumable 
uploads to disappear and return 410 GONE errors. Such a failure would obviously 
be a big problem for the Flink use case we're talking about here. That thread 
claims this bug is fixed as of July 2020; however, it got me thinking about 
alternative implementations, especially since they provided this guidance:
{quote}To retry successfully, catching 500 and 410 errors is required and, as 
the official documentation recommends 
([https://cloud.google.com/storage/docs/json_api/v1/status-codes#410_Gone]), 
implementing a retry by starting a new session for the upload that received an 
unsuccessful status code but still needs uploading. 
{quote}
 which we would be unable to follow.

So here's a different implementation idea. What if we didn't use resumable 
uploads at all, and instead just uploaded data in a series of temporary blobs, 
each of which would be uploaded via a normal, nonresumable upload. The 
[RecoverableFsDatastream|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.html]
 could buffer written data and write temporary blobs at various points (flush, 
sync, persist, close, or even write, if the buffered size exceeds some 
threshold). In this approach, the resumable state would include a list of all 
the temporary blobs uploaded associated with this write, and on commit the 
temporary files would be combined into one file and copied to the final blob 
location.

To combine the temp blobs into the final blob, we could use the 
[compose|https://cloud.google.com/storage/docs/composing-objects] feature of 
GCS, which allows combining up to 32 blobs into one in a single call, and which 
could compose an arbitrary number of blobs with a pretty simple algorithm.

A few additional benefits of this approach, potentially:
 * Each temp blob write would be a normal, nonresumable upload with known 
contents and size at the time of upload, so we could use CRC checks as 
recommended [here|https://cloud.google.com/storage/docs/hashes-etags].
 * We'd sidestep the one-week limitation; the lifetime of temporary blobs could 
be made indefinite or managed via bucket policy
 * We would be in full control of the resumable state, so we'd avoid any issues 
related to Java serialization

In this approach, we'd probably need a Flink option to set the maximum size of 
a temporary blob, which would control how frequently temporary blobs would be 
created. Tuning this would involve a tradeoff between memory usage and 
checkpoint size on the Flink side vs. time to commit the writes via the 
recoverable writer, as I'd expect it to take longer to compose a larger number 
of smaller temporary blobs together into the final blob than it would a smaller 
number of larger temporary blobs.

Anyway, just a thought – let me know what you think. And enjoy the New Year! :)

 

 

 

 

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 

[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-02-08 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17281481#comment-17281481
 ] 

Galen Warren commented on FLINK-11838:
--

Thanks, that all sounds reasonable. I'll take a look at the REST option. I'm 
going to be a bit tied up with some work-related stuff for the next couple of 
days so I might not get back until later in the week, but I'll definitely get 
back on this soon.

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-02-07 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17280587#comment-17280587
 ] 

Galen Warren commented on FLINK-11838:
--

Thanks. All of your initial assumptions are correct, though I might add that it 
is straightforward to serde a RestorableState if Java 
serialization is allowed (i.e. the object is guaranteed to implement 
Serializable). But I understand that's not preferred.

To answer your questions:

*Does "writing a blob to a temporary location" mean that the user always needs 
to configure a temporary location? How is the temporary location cleaned, say 
if they're never moved to the committed location?*

I was thinking to manage temporary locations through a couple of Flink options:
 * First, a "prefix" option, which would be a string to be applied in front of 
the supplied, "final" blob name. This prefix could be default be something like 
".inprogress/".
 * Second, an option for the bucket to use for temp blobs, which would not be 
required. If not supplied, the same bucket would be used for temp blobs as for 
the associated final blobs.

I was also planning on appending a UUID string to the end of temp blob 
locations, in order to guarantee their uniqueness during the temp-blob writing 
phase, in case somehow multiple writes to the same final blob were in progress 
at the same time.

So, with the defaults, a write to a blob {{gs://final_bucket/foo/bar}} would 
use a temp blob location of, say, 
{{gs://final_bucket/.inprogress/foo/bar/1157f422-32af-4e32-a797-2a0a05f28ecf}}. 
The prefix could be configured via the first option; also, if the user wanted 
to write all temp blobs to a "temp_bucket" bucket, that bucket could be 
specified via the second option, yielding 
{{gs://temp_bucket/.inprogress/foo/bar/1157f422-32af-4e32-a797-2a0a05f28ecf}}.

As for cleaning the temporary blobs, is that what 
[RecoverableWriter.cleanupRecoverableState|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html#cleanupRecoverableState-org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable-]
 does? I was thinking to clean up any temporary blobs there. Beyond that, if a 
separate bucket for temporary blobs were used, I suppose one could apply an 
[object lifecycle-management 
rule|https://cloud.google.com/storage/docs/lifecycle] that would delete blobs 
after some period of time. These rules look to be applicable only at the bucket 
level, so this would only work if a separate bucket were used just for 
temporary blobs.

 

*Per [this doc|https://cloud.google.com/storage/docs/resumable-uploads], a 
resumable upload must be completed within a week. This could be surprising for 
the users, if they try to restore a job from checkpoints/savepoints after 
pausing for more than one week.*

Yes, I would propose to disclose this limitation via a disclaimer, similar to 
the one used for S3:
{quote}Important Note 2: To guarantee exactly-once semantics while being 
efficient, the {{StreamingFileSink}} uses the [Multi-part 
Upload|https://docs.aws.amazon.com/AmazonS3/latest/dev/mpuoverview.html] 
feature of S3 (MPU from now on). This feature allows to upload files in 
independent chunks (thus the "multi-part") which can be combined into the 
original file when all the parts of the MPU are successfully uploaded. For 
inactive MPUs, S3 supports a bucket lifecycle rule that the user can use to 
abort multipart uploads that don't complete within a specified number of days 
after being initiated. This implies that if you set this rule aggressively and 
take a savepoint with some part-files being not fully uploaded, their 
associated MPUs may time-out before the job is restarted. This will result in 
your job not being able to restore from that savepoint as the pending 
part-files are no longer there and Flink will fail with an exception as it 
tries to fetch them and fails.
{quote}
Admittedly, it does seem that S3 provides more configuration options here than 
GCS. It would be nice if the week limit were configurable, but it doesn't seem 
to be, based on my read.

 

*Relying on Java serialization means depending our compatibility on the 
compatibility of GCS, which should be avoid if possible. Would it be possible 
to directly work with the REST API and session URI? IIUC this is how the write 
channel internally works.*

I'd need to look into it more closely, but yes, I think this could be possible. 
I think we'd wind up reimplementing much of what is done in 
[BlobWriteChannel|https://github.com/googleapis/java-storage/blob/master/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java]
 and 
[BaseWriteChannel|https://github.com/googleapis/java-core/blob/master/google-cloud-core/src/main/java/com/google/cloud/BaseWriteChannel.java],
 which I suppose I was thinking would be good to avoid. The code in the 
BlobWriteChannel 

[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-02-05 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17279694#comment-17279694
 ] 

Galen Warren commented on FLINK-11838:
--

Thanks [~xtsong]! I really appreciate your help on this. Sorry if I jumped the 
gun on the PR, I'm happy to follow the process you've outlined, and if it makes 
more sense to ultimately submit the PR in multiple pieces instead of one, 
that's fine with me too.

So it would seem that the next step would be to discuss the proposed design 
here. I'll take a crack at that.



At a high level, the goal here is to implement RecoverableWriter for GCS, in 
order to be able to use StreamingFileSink to write to GCS. In Flink, 
recoverable writers are created by calling 
[FileSystem.createRecoverableWriter|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/FileSystem.html#createRecoverableWriter--],
 so we will also need an implementation of org.apache.flink.core.fs.FileSystem 
for GCS in order to expose the recoverable writer to Flink.

The org.apache.flink.core.fs.FileSystem implementation is straightforward. 
Google provides an implementation of org.apache.hadoop.fs.FileSystem for GCS 
via 
[GoogleHadoopFileSystem|https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystem.java],
 which can already be used in Flink in other contexts, e.g. for checkpoints. 
Flink provides 
[HadoopFileSystem|https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.html],
 which implements org.apache.flink.core.fs.FileSystem in terms of an underlying 
org.apache.hadoop.fs.FileSystem. So, the proposed FileSystem for GCS would 
extend HadoopFileSystem, constructing it with an instance of 
GoogleHadoopFileSystem. This new FileSystem class would also override 
createRecoverableWriter to return a RecoverableWriter implementation for GCS. 

We also need an implementation of 
[FileSystemFactory|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/FileSystemFactory.html]
 and register it with SPI to expose the GCS FileSystem to Flink.

So, next is the interesting part – the GCS 
[RecoverableWriter|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html]
 implementation. At a high level, RecoverableWriter allows one to create an 
output stream, write bytes to that stream, persist and recover the state of 
that stream, and ultimately commit the write operation or abort it. In GCS, I 
propose to do recoverable writes in two steps:
 * First, write a blob to a temporary location using the [resumable 
uploads|https://cloud.google.com/storage/docs/resumable-uploads] feature of 
GCS. This allows for blobs to be written in chunks over a potentially long 
period of time (up to one week, per the docs). Importantly, it also allows for 
the write state to be persisted and recovered, via 
[WriteChannel.capture|https://googleapis.dev/java/google-cloud-clients/0.90.0-alpha/com/google/cloud/WriteChannel.html#capture--]
 and 
[RestorableState.restore|https://googleapis.dev/java/google-cloud-clients/0.90.0-alpha/com/google/cloud/RestorableState.html],
 which we'll use to implement the persist/recover functionality in 
RecoverableWriter.
 * Second, commit the write operation by copying the temporary blob to the 
"final" blob location, i.e. the one specified in the initial call to 
RecoverableWriter.open, and deleting the temporary blob. In the event of an 
aborted upload, the cleanup would consist of deleting just the temporary blob.

So, in this approach, the recoverable state from Flink's perspective (i.e. 
CommitRecoverable and ResumeRecoverable) would include:
 * The RestorableState returned from WriteChannel
 * The write position (we have to keep track of this ourselves, because 
WriteChannel doesn't expose this)
 * The locations of the temporary and final blobs, so that we can ultimately 
commit or cleanup the overall operation

That's basically it at a high level.

I do want to point out one possible conflict with the Flink coding guidelines, 
though, to get your thoughts. The guidelines say – very emphatically! –  not to 
use Java serialization for anything. In the GCS case, the RestorableState that 
is returned from WriteChannel.capture is an object that implements Serializable 
but is otherwise opaque. This object does need to be serialized/deserialized as 
part of the RecoverableWriter implementation, and it's not clear to me how to 
do that except by using Java serialization.



I'll stop there for now, please let me know if this is the sort of information 
you're looking for here. I'm happy to drill into any area in more detail if 
you'd like, just let me know. 

[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-02-04 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17279023#comment-17279023
 ] 

Galen Warren commented on FLINK-11838:
--

I have submitted a new [PR|https://github.com/apache/flink/pull/14875] for 
this, unrelated to the prior ones, and [~trohrmann] suggested I reuse this Jira 
ticket.

 

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Fokko Driesprong
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18810) Golang remote functions SDK

2020-09-23 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17201123#comment-17201123
 ] 

Galen Warren commented on FLINK-18810:
--

I'd also love to see a Golang SDK and would be willing to contribute here. Will 
check out [https://github.com/sjwiesman/statefun-go] ...

> Golang remote functions SDK
> ---
>
> Key: FLINK-18810
> URL: https://issues.apache.org/jira/browse/FLINK-18810
> Project: Flink
>  Issue Type: New Feature
>  Components: Stateful Functions
>Reporter: Francesco Guardiani
>Priority: Trivial
>
> Hi,
> I was wondering if there's already some WIP for a Golang SDK to create remote 
> functions. If not, I'm willing to give it a try.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19176) Support ScalaPB as a message payload serializer in Stateful Functions

2020-09-15 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17196268#comment-17196268
 ] 

Galen Warren commented on FLINK-19176:
--

I just created a PR: [https://github.com/apache/flink-statefun/pull/152].

> Support ScalaPB as a message payload serializer in Stateful Functions
> -
>
> Key: FLINK-19176
> URL: https://issues.apache.org/jira/browse/FLINK-19176
> Project: Flink
>  Issue Type: Improvement
>  Components: Stateful Functions
>Affects Versions: 2.0.0
>Reporter: Galen Warren
>Priority: Major
>  Labels: pull-request-available
> Fix For: statefun-2.1.0
>
>
> Currently, Stateful Functions supports four options for serialization of 
> message payloads:
>  * Protobuf (based on code generated for Java)
>  * Kryo
>  * Multilanguage 
>  * Raw
> This proposal is to add a fifth option to this list, to support serialization 
> of message payloads based on [ScalaPB |[https://scalapb.github.io/docs/]]. 
> This would allow Scala developers to use ScalaPB-generated classes as message 
> types in Stateful Functions directly as message types, without having to 
> convert to/from code generated for Java and/or raw byte[] messages.
> This would be a simple implementation, as there is already a 
> [MessagePayloadSerializer 
> |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]]
>  interface that is implemented for each of the existing serialization 
> methods; supporting ScalaPB would require a new class implementing 
> MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to 
> select it, by adding a new value to the [MessageFactoryType 
> |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration.
>  Plus testing :)
> I've done this already locally, the implementation is very similar to the 
> existing [MessagePayloadSerializerPb 
> |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation.
>  It uses a reference to ScalaPB in "provided" scope.
> Would you be interested in a pull request to add this? Primary benefit is to 
> make it easy to use Stateful Functions in a Scala environment. Thanks.
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19176) Support ScalaPB as a message payload serializer in Stateful Functions

2020-09-14 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17195635#comment-17195635
 ] 

Galen Warren commented on FLINK-19176:
--

Thanks! I'll get to work on that.

> Support ScalaPB as a message payload serializer in Stateful Functions
> -
>
> Key: FLINK-19176
> URL: https://issues.apache.org/jira/browse/FLINK-19176
> Project: Flink
>  Issue Type: Improvement
>  Components: Stateful Functions
>Affects Versions: 2.0.0
>Reporter: Galen Warren
>Priority: Major
> Fix For: statefun-2.1.0
>
>
> Currently, Stateful Functions supports four options for serialization of 
> message payloads:
>  * Protobuf (based on code generated for Java)
>  * Kryo
>  * Multilanguage 
>  * Raw
> This proposal is to add a fifth option to this list, to support serialization 
> of message payloads based on [ScalaPB |[https://scalapb.github.io/docs/]]. 
> This would allow Scala developers to use ScalaPB-generated classes as message 
> types in Stateful Functions directly as message types, without having to 
> convert to/from code generated for Java and/or raw byte[] messages.
> This would be a simple implementation, as there is already a 
> [MessagePayloadSerializer 
> |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]]
>  interface that is implemented for each of the existing serialization 
> methods; supporting ScalaPB would require a new class implementing 
> MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to 
> select it, by adding a new value to the [MessageFactoryType 
> |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration.
>  Plus testing :)
> I've done this already locally, the implementation is very similar to the 
> existing [MessagePayloadSerializerPb 
> |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation.
>  It uses a reference to ScalaPB in "provided" scope.
> Would you be interested in a pull request to add this? Primary benefit is to 
> make it easy to use Stateful Functions in a Scala environment. Thanks.
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19176) Support ScalaPB as a message payload serializer in Stateful Functions

2020-09-14 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17195564#comment-17195564
 ] 

Galen Warren commented on FLINK-19176:
--

Thanks [~igal].

Yep, that's almost exactly how I approached it! I thought of MessageFactoryKey 
as needing to containing two pieces of information – the existing 
MessageFactoryType (with one new value added to the enumeration, 
USES_CUSTOM_PAYLOADS) and a string field (customPayloadSerializerName) that 
contains the name of the custom payload serializer class, which would be used 
only with USES_CUSTOM_PAYLOADS.

However, one question. Several classes that currently have MessageFactoryKey as 
a field are serializable, and so changing the type of that field would break 
serialization backward compatibility if nothing else were done. 
MessageTypeSerializer is an example of such a class. I can see a few options 
for how to proceed:
 * Just replace MessageFactoryType with MessageFactoryKey anyway, bump the 
serialization uid and accept the incompatibility
 * Replace MessageFactoryType with MessageFactoryKey, bump the serialization 
uid, and implement custom Java serialization so that the new versions of the 
classes can deserialize serialized data from the old version of the classes 
with the different field
 * Use MessageFactoryKey for parameters everywhere but, in serializable classes 
that currently contain MessageFactoryKey, leave that MessageFactoryType as a 
field and also add customPayloadSerializerName as a field. This should allow 
both the new and old versions of the class to read the same serialized data and 
not require a bump of the version uid.

Does that sound right, and if so, any recommendations on how to proceed? Thanks.

 

> Support ScalaPB as a message payload serializer in Stateful Functions
> -
>
> Key: FLINK-19176
> URL: https://issues.apache.org/jira/browse/FLINK-19176
> Project: Flink
>  Issue Type: Improvement
>  Components: Stateful Functions
>Affects Versions: 2.0.0
>Reporter: Galen Warren
>Priority: Major
> Fix For: statefun-2.1.0
>
>
> Currently, Stateful Functions supports four options for serialization of 
> message payloads:
>  * Protobuf (based on code generated for Java)
>  * Kryo
>  * Multilanguage 
>  * Raw
> This proposal is to add a fifth option to this list, to support serialization 
> of message payloads based on [ScalaPB |[https://scalapb.github.io/docs/]]. 
> This would allow Scala developers to use ScalaPB-generated classes as message 
> types in Stateful Functions directly as message types, without having to 
> convert to/from code generated for Java and/or raw byte[] messages.
> This would be a simple implementation, as there is already a 
> [MessagePayloadSerializer 
> |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]]
>  interface that is implemented for each of the existing serialization 
> methods; supporting ScalaPB would require a new class implementing 
> MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to 
> select it, by adding a new value to the [MessageFactoryType 
> |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration.
>  Plus testing :)
> I've done this already locally, the implementation is very similar to the 
> existing [MessagePayloadSerializerPb 
> |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation.
>  It uses a reference to ScalaPB in "provided" scope.
> Would you be interested in a pull request to add this? Primary benefit is to 
> make it easy to use Stateful Functions in a Scala environment. Thanks.
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19176) Support ScalaPB as a message payload serializer in Stateful Functions

2020-09-12 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17194832#comment-17194832
 ] 

Galen Warren commented on FLINK-19176:
--

Hi [~igal] - I've forked the project and taken a crack at the approach based on 
a pluggable MessagePayloadSerializer. It seems to work in my project. I haven't 
completely run through the tape yet on unit tests; before I do, I thought I'd 
check if you (or someone) would like to validate the approach. Should I create 
a pull request?

Thanks,

Galen

> Support ScalaPB as a message payload serializer in Stateful Functions
> -
>
> Key: FLINK-19176
> URL: https://issues.apache.org/jira/browse/FLINK-19176
> Project: Flink
>  Issue Type: Improvement
>  Components: Stateful Functions
>Affects Versions: 2.0.0
>Reporter: Galen Warren
>Priority: Major
> Fix For: statefun-2.1.0
>
>
> Currently, Stateful Functions supports four options for serialization of 
> message payloads:
>  * Protobuf (based on code generated for Java)
>  * Kryo
>  * Multilanguage 
>  * Raw
> This proposal is to add a fifth option to this list, to support serialization 
> of message payloads based on [ScalaPB |[https://scalapb.github.io/docs/]]. 
> This would allow Scala developers to use ScalaPB-generated classes as message 
> types in Stateful Functions directly as message types, without having to 
> convert to/from code generated for Java and/or raw byte[] messages.
> This would be a simple implementation, as there is already a 
> [MessagePayloadSerializer 
> |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]]
>  interface that is implemented for each of the existing serialization 
> methods; supporting ScalaPB would require a new class implementing 
> MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to 
> select it, by adding a new value to the [MessageFactoryType 
> |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration.
>  Plus testing :)
> I've done this already locally, the implementation is very similar to the 
> existing [MessagePayloadSerializerPb 
> |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation.
>  It uses a reference to ScalaPB in "provided" scope.
> Would you be interested in a pull request to add this? Primary benefit is to 
> make it easy to use Stateful Functions in a Scala environment. Thanks.
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19176) Support ScalaPB as a message payload serializer in Stateful Functions

2020-09-11 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17194243#comment-17194243
 ] 

Galen Warren commented on FLINK-19176:
--

Thanks for the quick reply! A few comments/questions:
{quote}Unfortunately adding ScalaPB payload serializer to the project directly 
(even with runtime scope) will be very hard to get a consensus for, since 
maintaining different Scala version is problematic and we have experienced that 
in Flink in many different instances.
{quote}
Obviously, I'll defer to your judgment as to what could achieve consensus here. 
It does look to me that StateFun already has a Scala dependency; the additional 
ScalaPB dependency that would be added here (in statefun-flink-core) would just 
reference the existing Scala dependency defined in statefun-parent, i.e.: 
{noformat}

com.thesamet.scalapb
scalapb-runtime_${scala.binary.version}
${scalapb.version}
provided
{noformat}
So it wouldn't be adding a new Scala dependency, but perhaps even referencing 
the existing Scala dependency is undesirable. I do have a working version of 
this locally, I'd be happy to share the code with you, if you want to see 
exactly what it might look like.
{quote}I think that the long term solution should be a Scala native remote SDK, 
and I would be perfectly happy to help kicking off this effort if you are 
interested :)
{quote}
This sounds interesting but I don't quite understand what this would entail, 
would you mind elaborating a bit?
{quote}Meanwhile, if you would like to use ScalaPB for the embedded functions, 
then we would need to support pluggable MessagePayloadSerializer, that would be 
picked up in runtime from the class-path. If you are interesting on working on 
that, I can try to outline the steps needed to do that.
{quote}
This sounds promising to me! Are you suggesting that, instead of having to 
choose among the predefined serializers (protobuf, kryo, etc.), one could 
alternatively provide the name of a MessagePayloadSerializer class – i.e. 
com.mydomain.MySerializer – in the StateFun configuration, and then 
MessageFactory would create an instance of that class when serializers are 
needed? That sounds like a great option to me. I'd be happy to work on that and 
would appreciate any direction you could provide.

Thanks. 

> Support ScalaPB as a message payload serializer in Stateful Functions
> -
>
> Key: FLINK-19176
> URL: https://issues.apache.org/jira/browse/FLINK-19176
> Project: Flink
>  Issue Type: Improvement
>  Components: Stateful Functions
>Affects Versions: 2.0.0
>Reporter: Galen Warren
>Priority: Major
> Fix For: statefun-2.1.0
>
>
> Currently, Stateful Functions supports four options for serialization of 
> message payloads:
>  * Protobuf (based on code generated for Java)
>  * Kryo
>  * Multilanguage 
>  * Raw
> This proposal is to add a fifth option to this list, to support serialization 
> of message payloads based on [ScalaPB |[https://scalapb.github.io/docs/]]. 
> This would allow Scala developers to use ScalaPB-generated classes as message 
> types in Stateful Functions directly as message types, without having to 
> convert to/from code generated for Java and/or raw byte[] messages.
> This would be a simple implementation, as there is already a 
> [MessagePayloadSerializer 
> |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]]
>  interface that is implemented for each of the existing serialization 
> methods; supporting ScalaPB would require a new class implementing 
> MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to 
> select it, by adding a new value to the [MessageFactoryType 
> |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration.
>  Plus testing :)
> I've done this already locally, the implementation is very similar to the 
> existing [MessagePayloadSerializerPb 
> |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation.
>  It uses a reference to ScalaPB in "provided" scope.
> Would you be interested in a pull request to add this? Primary benefit is to 
> make it easy to use Stateful Functions in a Scala environment. Thanks.
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19176) Support ScalaPB as a message payload serializer in Stateful Functions

2020-09-09 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17193217#comment-17193217
 ] 

Galen Warren commented on FLINK-19176:
--

Sorry for the edits – I'm trying to get hyperlinks to appear correctly. 

> Support ScalaPB as a message payload serializer in Stateful Functions
> -
>
> Key: FLINK-19176
> URL: https://issues.apache.org/jira/browse/FLINK-19176
> Project: Flink
>  Issue Type: Improvement
>  Components: Stateful Functions
>Affects Versions: 2.0.0
>Reporter: Galen Warren
>Priority: Major
> Fix For: statefun-2.1.0
>
>
> Currently, Stateful Functions supports four options for serialization of 
> message payloads:
>  * Protobuf (based on code generated for Java)
>  * Kryo
>  * Multilanguage 
>  * Raw
> This proposal is to add a fifth option to this list, to support serialization 
> of message payloads based on [ScalaPB |[https://scalapb.github.io/docs/]]. 
> This would allow Scala developers to use ScalaPB-generated classes as message 
> types in Stateful Functions directly as message types, without having to 
> convert to/from code generated for Java and/or raw byte[] messages.
> This would be a simple implementation, as there is already a 
> [MessagePayloadSerializer 
> |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]]
>  interface that is implemented for each of the existing serialization 
> methods; supporting ScalaPB would require a new class implementing 
> MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to 
> select it, by adding a new value to the [MessageFactoryType 
> |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration.
>  Plus testing :)
> I've done this already locally, the implementation is very similar to the 
> existing [MessagePayloadSerializerPb 
> |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation.
>  It uses a reference to ScalaPB in "provided" scope.
> Would you be interested in a pull request to add this? Primary benefit is to 
> make it easy to use Stateful Functions in a Scala environment. Thanks.
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19176) Support ScalaPB as a message payload serializer in Stateful Functions

2020-09-09 Thread Galen Warren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Galen Warren updated FLINK-19176:
-
Description: 
Currently, Stateful Functions supports four options for serialization of 
message payloads:
 * Protobuf (based on code generated for Java)
 * Kryo
 * Multilanguage 
 * Raw

This proposal is to add a fifth option to this list, to support serialization 
of message payloads based on [ScalaPB |[https://scalapb.github.io/docs/]]. This 
would allow Scala developers to use ScalaPB-generated classes as message types 
in Stateful Functions directly as message types, without having to convert 
to/from code generated for Java and/or raw byte[] messages.

This would be a simple implementation, as there is already a 
[MessagePayloadSerializer 
|[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]]
 interface that is implemented for each of the existing serialization methods; 
supporting ScalaPB would require a new class implementing 
MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to 
select it, by adding a new value to the [MessageFactoryType 
|[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration.
 Plus testing :)

I've done this already locally, the implementation is very similar to the 
existing [MessagePayloadSerializerPb 
|[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation.
 It uses a reference to ScalaPB in "provided" scope.

Would you be interested in a pull request to add this? Primary benefit is to 
make it easy to use Stateful Functions in a Scala environment. Thanks.

 

 

 

 

 

  was:
Currently, Stateful Functions supports four options for serialization of 
message payloads:
 * Protobuf (based on code generated for Java)
 * Kryo
 * Multilanguage 
 * Raw

This proposal is to add a fifth option to this list, to support serialization 
of message payloads based on [ScalaPB]([https://scalapb.github.io/docs/]). This 
would allow Scala developers to use ScalaPB-generated classes as message types 
in Stateful Functions directly as message types, without having to convert 
to/from code generated for Java and/or raw byte[] messages.

This would be a simple implementation, as there is already a 
[MessagePayloadSerializer 
|[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]]
 interface that is implemented for each of the existing serialization methods; 
supporting ScalaPB would require a new class implementing 
MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to 
select it, by adding a new value to the [MessageFactoryType 
|[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration.
 Plus testing :)

I've done this already locally, the implementation is very similar to the 
existing [MessagePayloadSerializerPb 
|[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation.
 It uses a reference to ScalaPB in "provided" scope.

Would you be interested in a pull request to add this? Primary benefit is to 
make it easy to use Stateful Functions in a Scala environment. Thanks.

 

 

 

 

 


> Support ScalaPB as a message payload serializer in Stateful Functions
> -
>
> Key: FLINK-19176
> URL: https://issues.apache.org/jira/browse/FLINK-19176
> Project: Flink
>  Issue Type: Improvement
>  Components: Stateful Functions
>Affects Versions: 2.0.0
>Reporter: Galen Warren
>Priority: Major
> Fix For: statefun-2.1.0
>
>
> Currently, Stateful Functions supports four options for serialization of 
> message payloads:
>  * Protobuf (based on code generated for Java)
>  * Kryo
>  * Multilanguage 
>  * Raw
> This proposal is to add a fifth option to this list, to support serialization 
> of message payloads based on [ScalaPB |[https://scalapb.github.io/docs/]]. 
> This would allow Scala developers to use ScalaPB-generated 

[jira] [Updated] (FLINK-19176) Support ScalaPB as a message payload serializer in Stateful Functions

2020-09-09 Thread Galen Warren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Galen Warren updated FLINK-19176:
-
Description: 
Currently, Stateful Functions supports four options for serialization of 
message payloads:
 * Protobuf (based on code generated for Java)
 * Kryo
 * Multilanguage 
 * Raw

This proposal is to add a fifth option to this list, to support serialization 
of message payloads based on [ScalaPB]([https://scalapb.github.io/docs/]). This 
would allow Scala developers to use ScalaPB-generated classes as message types 
in Stateful Functions directly as message types, without having to convert 
to/from code generated for Java and/or raw byte[] messages.

This would be a simple implementation, as there is already a 
[MessagePayloadSerializer 
|[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]]
 interface that is implemented for each of the existing serialization methods; 
supporting ScalaPB would require a new class implementing 
MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to 
select it, by adding a new value to the [MessageFactoryType 
|[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration.
 Plus testing :)

I've done this already locally, the implementation is very similar to the 
existing [MessagePayloadSerializerPb 
|[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation.
 It uses a reference to ScalaPB in "provided" scope.

Would you be interested in a pull request to add this? Primary benefit is to 
make it easy to use Stateful Functions in a Scala environment. Thanks.

 

 

 

 

 

  was:
Currently, Stateful Functions supports four options for serialization of 
message payloads:
 * Protobuf (based on code generated for Java)
 * Kryo
 * Multilanguage 
 * Raw

This proposal is to add a fifth option to this list, to support serialization 
of message payloads based on [ScalaPB](|[https://scalapb.github.io/docs/]). 
This would allow Scala developers to use ScalaPB-generated classes as message 
types in Stateful Functions directly as message types, without having to 
convert to/from code generated for Java and/or raw byte[] messages.

This would be a simple implementation, as there is already a 
[MessagePayloadSerializer 
|[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]]
 interface that is implemented for each of the existing serialization methods; 
supporting ScalaPB would require a new class implementing 
MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to 
select it, by adding a new value to the [MessageFactoryType 
|[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration.
 Plus testing :)

I've done this already locally, the implementation is very similar to the 
existing [MessagePayloadSerializerPb 
|[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation.
 It uses a reference to ScalaPB in "provided" scope.

Would you be interested in a pull request to add this? Primary benefit is to 
make it easy to use Stateful Functions in a Scala environment. Thanks.

 

 

 

 

 


> Support ScalaPB as a message payload serializer in Stateful Functions
> -
>
> Key: FLINK-19176
> URL: https://issues.apache.org/jira/browse/FLINK-19176
> Project: Flink
>  Issue Type: Improvement
>  Components: Stateful Functions
>Affects Versions: 2.0.0
>Reporter: Galen Warren
>Priority: Major
> Fix For: statefun-2.1.0
>
>
> Currently, Stateful Functions supports four options for serialization of 
> message payloads:
>  * Protobuf (based on code generated for Java)
>  * Kryo
>  * Multilanguage 
>  * Raw
> This proposal is to add a fifth option to this list, to support serialization 
> of message payloads based on [ScalaPB]([https://scalapb.github.io/docs/]). 
> This would allow Scala developers to use ScalaPB-generated 

[jira] [Updated] (FLINK-19176) Support ScalaPB as a message payload serializer in Stateful Functions

2020-09-09 Thread Galen Warren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Galen Warren updated FLINK-19176:
-
Description: 
Currently, Stateful Functions supports four options for serialization of 
message payloads:
 * Protobuf (based on code generated for Java)
 * Kryo
 * Multilanguage 
 * Raw

This proposal is to add a fifth option to this list, to support serialization 
of message payloads based on [ScalaPB](|[https://scalapb.github.io/docs/]). 
This would allow Scala developers to use ScalaPB-generated classes as message 
types in Stateful Functions directly as message types, without having to 
convert to/from code generated for Java and/or raw byte[] messages.

This would be a simple implementation, as there is already a 
[MessagePayloadSerializer 
|[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]]
 interface that is implemented for each of the existing serialization methods; 
supporting ScalaPB would require a new class implementing 
MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to 
select it, by adding a new value to the [MessageFactoryType 
|[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration.
 Plus testing :)

I've done this already locally, the implementation is very similar to the 
existing [MessagePayloadSerializerPb 
|[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation.
 It uses a reference to ScalaPB in "provided" scope.

Would you be interested in a pull request to add this? Primary benefit is to 
make it easy to use Stateful Functions in a Scala environment. Thanks.

 

 

 

 

 

  was:
Currently, Stateful Functions supports four options for serialization of 
message payloads:
 * Protobuf (based on code generated for Java)
 * Kryo
 * Multilanguage 
 * Raw

This proposal is to add a fifth option to this list, to support serialization 
of message payloads based on [ScalaPB|[https://scalapb.github.io/docs/]]. This 
would allow Scala developers to use ScalaPB-generated classes as message types 
in Stateful Functions directly as message types, without having to convert 
to/from code generated for Java and/or raw byte[] messages.

This would be a simple implementation, as there is already a 
[MessagePayloadSerializer 
|[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]]
 interface that is implemented for each of the existing serialization methods; 
supporting ScalaPB would require a new class implementing 
MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to 
select it, by adding a new value to the [MessageFactoryType 
|[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration.
 Plus testing :)

I've done this already locally, the implementation is very similar to the 
existing [MessagePayloadSerializerPb 
|[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation.
 It uses a reference to ScalaPB in "provided" scope.

Would you be interested in a pull request to add this? Primary benefit is to 
make it easy to use Stateful Functions in a Scala environment. Thanks.

 

 

 

 

 


> Support ScalaPB as a message payload serializer in Stateful Functions
> -
>
> Key: FLINK-19176
> URL: https://issues.apache.org/jira/browse/FLINK-19176
> Project: Flink
>  Issue Type: Improvement
>  Components: Stateful Functions
>Affects Versions: 2.0.0
>Reporter: Galen Warren
>Priority: Major
> Fix For: statefun-2.1.0
>
>
> Currently, Stateful Functions supports four options for serialization of 
> message payloads:
>  * Protobuf (based on code generated for Java)
>  * Kryo
>  * Multilanguage 
>  * Raw
> This proposal is to add a fifth option to this list, to support serialization 
> of message payloads based on [ScalaPB](|[https://scalapb.github.io/docs/]). 
> This would allow Scala developers to use ScalaPB-generated 

[jira] [Created] (FLINK-19176) Support ScalaPB as a message payload serializer in Stateful Functions

2020-09-09 Thread Galen Warren (Jira)
Galen Warren created FLINK-19176:


 Summary: Support ScalaPB as a message payload serializer in 
Stateful Functions
 Key: FLINK-19176
 URL: https://issues.apache.org/jira/browse/FLINK-19176
 Project: Flink
  Issue Type: Improvement
  Components: Stateful Functions
Affects Versions: 2.0.0
Reporter: Galen Warren
 Fix For: statefun-2.1.0


Currently, Stateful Functions supports four options for serialization of 
message payloads:
 * Protobuf (based on code generated for Java)
 * Kryo
 * Multilanguage 
 * Raw

This proposal is to add a fifth option to this list, to support serialization 
of message payloads based on [ScalaPB|[https://scalapb.github.io/docs/]]. This 
would allow Scala developers to use ScalaPB-generated classes as message types 
in Stateful Functions directly as message types, without having to convert 
to/from code generated for Java and/or raw byte[] messages.

This would be a simple implementation, as there is already a 
[MessagePayloadSerializer 
|[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]]
 interface that is implemented for each of the existing serialization methods; 
supporting ScalaPB would require a new class implementing 
MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to 
select it, by adding a new value to the [MessageFactoryType 
|[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration.
 Plus testing :)

I've done this already locally, the implementation is very similar to the 
existing [MessagePayloadSerializerPb 
|[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation.
 It uses a reference to ScalaPB in "provided" scope.

Would you be interested in a pull request to add this? Primary benefit is to 
make it easy to use Stateful Functions in a Scala environment. Thanks.

 

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-11055) Allow Queryable State to be transformed on the TaskManager before being returned to the client

2018-12-03 Thread Galen Warren (JIRA)
Galen Warren created FLINK-11055:


 Summary: Allow Queryable State to be transformed on the 
TaskManager before being returned to the client
 Key: FLINK-11055
 URL: https://issues.apache.org/jira/browse/FLINK-11055
 Project: Flink
  Issue Type: Improvement
  Components: Queryable State
Reporter: Galen Warren
 Fix For: 1.7.0


The proposal here is to enhance the way Queryable State works to allow for the 
state object to be transformed on the TaskManager before being returned to the 
client. As an example, if some MapState were made queryable, such a 
transform might look up a specific key in the map and return its corresponding 
value, resulting in only that value being returned to the client instead of the 
entire map. This could be useful in cases where the client only wants a portion 
of the state and the state is large (this is my use case).

At a high level, I think this could be accomplished by adding an (optional) 
serializable Function into KvStateRequest (and related 
classes?) and having that transform be applied in the QueryableStateServer (or 
QueryableStateClientProxy?). I expect some additional TypeInformation would 
also have to be supplied/used in places. It should be doable in a backwards 
compatible way such that if the client does not specify a transform it works 
exactly as it does now.

Would there be any interested in a PR for this? This would help me for 
something I'm currently working on and I'd be willing to take a crack at it. If 
there is interest, I'll be happy to do some more research to come up with a 
more concrete proposal.

Thanks for Flink - it's great!

 

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)