[jira] [Commented] (FLINK-35232) Support for retry settings on GCS connector
[ https://issues.apache.org/jira/browse/FLINK-35232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841999#comment-17841999 ] Galen Warren commented on FLINK-35232: -- [~xtsong] You helped with the original implementation of the GCS RecoverableWriter implementation. This proposal – to allow people to adjust the retry settings – makes sense to me, but I don't really know what guidance to give the people proposing it as far as potential next steps might be. How would something like this potentially move forward? Thanks. > Support for retry settings on GCS connector > --- > > Key: FLINK-35232 > URL: https://issues.apache.org/jira/browse/FLINK-35232 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.15.3, 1.16.2, 1.17.1, 1.19.0, 1.18.1 >Reporter: Vikas M >Assignee: Ravi Singh >Priority: Major > > https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to > specify transport options in GCS connector. While setting the params enabled > here reduced read timeouts, we still see 503 errors leading to Flink job > restarts. > Thus, in this ticket, we want to specify additional retry settings as noted > in > [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries] > We need > [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods] > methods available for Flink users so that they can customize their > deployment. In particular next settings seems to be the minimum required to > adjust GCS timeout with Job's checkpoint config: > * > [maxAttempts|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxAttempts__] > * > [initialRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getInitialRpcTimeout__] > * > [rpcTimeoutMultiplier|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getRpcTimeoutMultiplier__] > * > [maxRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxRpcTimeout__] > * > [totalTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getTotalTimeout__] > > Basically the proposal is to be able to tune the timeout via multiplier, > maxAttemts + totalTimeout mechanisms. > All of the config options should be optional and the default one should be > used in case some of configs are not provided. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35232) Support for retry settings on GCS connector
[ https://issues.apache.org/jira/browse/FLINK-35232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840942#comment-17840942 ] Galen Warren commented on FLINK-35232: -- Yes, that's using the RecoverableWriter. The default retry settings are here: [Retry strategy | Cloud Storage | Google Cloud|https://cloud.google.com/storage/docs/retry-strategy#java], so it should be retrying 6 times for up to 50 seconds I believe, as is. Which settings would you be looking to implement? And you're sure nothing else is going on that's causing the failures (network connectivity, etc.)? Full disclosure, I worked on RecoverableWriter but I'm not a committer, so I can't really ok anything on my own. But being able to adjust these settings seems reasonable to me. > Support for retry settings on GCS connector > --- > > Key: FLINK-35232 > URL: https://issues.apache.org/jira/browse/FLINK-35232 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.15.3, 1.16.2, 1.17.1, 1.19.0, 1.18.1 >Reporter: Vikas M >Assignee: Ravi Singh >Priority: Major > > https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to > specify transport options in GCS connector. While setting the params enabled > here reduced read timeouts, we still see 503 errors leading to Flink job > restarts. > Thus, in this ticket, we want to specify additional retry settings as noted > in > [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries] > We want > [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods] > methods available for Flink users so that they can customize their > deployment. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35232) Support for retry settings on GCS connector
[ https://issues.apache.org/jira/browse/FLINK-35232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840862#comment-17840862 ] Galen Warren commented on FLINK-35232: -- In case it's relevant, reading/writing to GCS happens in two different ways in Flink. First, the basic connector wraps a Google-provided Hadoop connector and leverages Flink's support for reading/writing to Hadoop file systems. Second, the RecoverableWriter support leverages the Google Java library directly – the extra features associated with RecoverableWriter require this. Just mentioning this, because the linked issue says that Hadoop libraries aren't used, which isn't always the case. I'm not sure which mode you're using the connector, from your description. > Support for retry settings on GCS connector > --- > > Key: FLINK-35232 > URL: https://issues.apache.org/jira/browse/FLINK-35232 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.15.3, 1.16.2, 1.17.1, 1.19.0, 1.18.1 >Reporter: Vikas M >Assignee: Ravi Singh >Priority: Major > > https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to > specify transport options in GCS connector. While setting the params enabled > here reduced read timeouts, we still see 503 errors leading to Flink job > restarts. > Thus, in this ticket, we want to specify additional retry settings as noted > in > [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries] > We want > [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods] > methods available for Flink users so that they can customize their > deployment. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34696) GSRecoverableWriterCommitter is generating excessive data blobs
[ https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830956#comment-17830956 ] Galen Warren commented on FLINK-34696: -- I'm not a committer so I can't approve anything on my own, but if we nail down the proposal I can loop in the person who helped me with the original implementation to see how to proceed. To make sure we're on the same page, are we talking about implementing the more efficient composing algorithm (NlogN vs N^2)? I would suggest making it an opt-in to the new algorithm. > GSRecoverableWriterCommitter is generating excessive data blobs > --- > > Key: FLINK-34696 > URL: https://issues.apache.org/jira/browse/FLINK-34696 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Simon-Shlomo Poil >Priority: Major > > The `composeBlobs` method in > `org.apache.flink.fs.gs.writer.GSRecoverableWriterCommitter` is designed to > merge multiple small blobs into a single large blob using Google Cloud > Storage's compose method. This process is iterative, combining the result > from the previous iteration with 31 new blobs until all blobs are merged. > Upon completion of the composition, the method proceeds to remove the > temporary blobs. > *Issue:* > This methodology results in significant, unnecessary data storage consumption > during the blob composition process, incurring considerable costs due to > Google Cloud Storage pricing models. > *Example to Illustrate the Problem:* > - Initial state: 64 blobs, each 1 GB in size (totaling 64 GB). > - After 1st step: 32 blobs are merged into a single blob, increasing total > storage to 96 GB (64 original + 32 GB new). > - After 2nd step: The newly created 32 GB blob is merged with 31 more blobs, > raising the total to 159 GB. > - After 3rd step: The final blob is merged, culminating in a total of 223 GB > to combine the original 64 GB of data. This results in an overhead of 159 GB. > *Impact:* > This inefficiency has a profound impact, especially at scale, where terabytes > of data can incur overheads in the petabyte range, leading to unexpectedly > high costs. Additionally, we have observed an increase in storage exceptions > thrown by the Google Storage library, potentially linked to this issue. > *Suggested Solution:* > To mitigate this problem, we propose modifying the `composeBlobs` method to > immediately delete source blobs once they have been successfully combined. > This change could significantly reduce data duplication and associated costs. > However, the implications for data recovery and integrity need careful > consideration to ensure that this optimization does not compromise the > ability to recover data in case of a failure during the composition process. > *Steps to Reproduce:* > 1. Initiate the blob composition process in an environment with a significant > number of blobs (e.g., 64 blobs of 1 GB each). > 2. Observe the temporary increase in data storage as blobs are iteratively > combined. > 3. Note the final amount of data storage used compared to the initial total > size of the blobs. > *Expected Behavior:* > The blob composition process should minimize unnecessary data storage use, > efficiently managing resources to combine blobs without generating excessive > temporary data overhead. > *Actual Behavior:* > The current implementation results in significant temporary increases in data > storage, leading to high costs and potential system instability due to > frequent storage exceptions. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34696) GSRecoverableWriterCommitter is generating excessive data blobs
[ https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17829246#comment-17829246 ] Galen Warren commented on FLINK-34696: -- I'm not aware of any special considerations in RecoverableWriter for batch mode, but to be honest I'm not very knowledgeable about batch mode in general. Maybe someone else knows more about that ... {quote}The code above also avoids inserting the intermediate compose blob identifier at index 0. That looks expensive if the list is very big (and as far as I see not a LinkedList) {quote} I don't think it's inserting anything, really, as the blobs are immutable. I think that just controls the order of the bytes in the newly created blob. > GSRecoverableWriterCommitter is generating excessive data blobs > --- > > Key: FLINK-34696 > URL: https://issues.apache.org/jira/browse/FLINK-34696 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Simon-Shlomo Poil >Priority: Major > > The `composeBlobs` method in > `org.apache.flink.fs.gs.writer.GSRecoverableWriterCommitter` is designed to > merge multiple small blobs into a single large blob using Google Cloud > Storage's compose method. This process is iterative, combining the result > from the previous iteration with 31 new blobs until all blobs are merged. > Upon completion of the composition, the method proceeds to remove the > temporary blobs. > *Issue:* > This methodology results in significant, unnecessary data storage consumption > during the blob composition process, incurring considerable costs due to > Google Cloud Storage pricing models. > *Example to Illustrate the Problem:* > - Initial state: 64 blobs, each 1 GB in size (totaling 64 GB). > - After 1st step: 32 blobs are merged into a single blob, increasing total > storage to 96 GB (64 original + 32 GB new). > - After 2nd step: The newly created 32 GB blob is merged with 31 more blobs, > raising the total to 159 GB. > - After 3rd step: The final blob is merged, culminating in a total of 223 GB > to combine the original 64 GB of data. This results in an overhead of 159 GB. > *Impact:* > This inefficiency has a profound impact, especially at scale, where terabytes > of data can incur overheads in the petabyte range, leading to unexpectedly > high costs. Additionally, we have observed an increase in storage exceptions > thrown by the Google Storage library, potentially linked to this issue. > *Suggested Solution:* > To mitigate this problem, we propose modifying the `composeBlobs` method to > immediately delete source blobs once they have been successfully combined. > This change could significantly reduce data duplication and associated costs. > However, the implications for data recovery and integrity need careful > consideration to ensure that this optimization does not compromise the > ability to recover data in case of a failure during the composition process. > *Steps to Reproduce:* > 1. Initiate the blob composition process in an environment with a significant > number of blobs (e.g., 64 blobs of 1 GB each). > 2. Observe the temporary increase in data storage as blobs are iteratively > combined. > 3. Note the final amount of data storage used compared to the initial total > size of the blobs. > *Expected Behavior:* > The blob composition process should minimize unnecessary data storage use, > efficiently managing resources to combine blobs without generating excessive > temporary data overhead. > *Actual Behavior:* > The current implementation results in significant temporary increases in data > storage, leading to high costs and potential system instability due to > frequent storage exceptions. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34696) GSRecoverableWriterCommitter is generating excessive data blobs
[ https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828412#comment-17828412 ] Galen Warren edited comment on FLINK-34696 at 3/19/24 5:07 PM: --- I think your proposed algorithm is just another way of implementing the "delete intermediate blobs that are no longer necessary as soon as possible" idea we've considered. You accomplish it by overwriting the staging blob on each iteration; a similar effect could be achieved by writing to a new intermediate staging blob on each iteration (as is done now) and deleting the old one right away (which is not done now. instead this deletion occurs shortly thereafter, after the commit succeeds). Aside: I'm not sure whether it's possible to overwrite the existing staging blob like you suggest. The [docs |[Objects: compose | Cloud Storage | Google Cloud|https://cloud.google.com/storage/docs/json_api/v1/objects/compose]] for the compose operation say: {quote}Concatenates a list of existing objects into a new object in the same bucket. The existing source objects are unaffected by this operation {quote} In your proposal, the same staging blob is both the target of the compose operation and one of the input blobs to be composed. That _might_ work, but would have to be tested to see if it's allowed. If it isn't, writing to a new blob and deleting the old one would have essentially the same effect. That's almost certainly what happens behind the scenes anyway, since blobs are immutable. Bigger picture, if you're trying to combine millions of immutable blobs together in one step, 32 at a time, I don't see how you avoid having lots of intermediate composed blobs, one way or another, at least temporarily. The main question would be how quickly ones that are no longer needed are discarded. {quote}To streamline this, we could modify {{GSCommitRecoverable}} to update the list of {{{}componentObjectIds{}}}, allowing the removal of blobs already appended to the {{stagingBlob}} {quote} Not quite following you here; the GSCommitRecoverable is provided as an input to the GSRecoverableWriterCommitter, providing the list of "raw" (i.e. uncomposed) temporary blobs that need to be composed and committed. If you removed those raw blobs from that list as they're added to the staging blob, what would that accomplish? The GSCommitRecoverable provided to the GSRecoverableWriterCommitter isn't persisted anywhere after the commit succeeds or fails, and if the commit were to be retried it would need the complete list anyway. {quote}I notice an other issue with the code - because the storage.compose might throw a StorageException. With the current code this would mean the intermediate composition blobs are not cleaned up. If the code above is implemented, I think there is no longer a need the TTL feature since all necessary blobs should be written to a final blob. Any leftover blobs post-job completion would indicate a failed state. {quote} There is no real way to prevent the orphaning of intermediate blobs in all failure scenarios. Even in your proposed algorithm, the staging blob could be orphaned if the composition process failed partway through. This is what the temp bucket and TTL mechanism are for. It's optional, so you don't have to use it, but yes you would have to keep an eye out for orphaned intermediate blobs via some other mechanism f you choose not to use it. Honestly, I think some of your difficulties are coming from trying to combine so much data together at once vs. doing it along the way, with commits at incremental checkpoints. I was going to suggest you reduce your checkpoint interval, to aggregate more frequently, but obviously that doesn't work if you're not checkpointing at all. I do think the RecoverableWriter mechanism is designed to make writing predictable and repeatable in checkpoint/recovery situations (hence the name); if you're not doing any of that, you may run into some challenges. If you were to use checkpoints, you would aggregate more frequently, meaning fewer intermediate blobs with shorter lifetimes, and you'd be less likely to run up against the 5TB limit, as you would end up with a series of smaller files, written at each checkpoint, vs. one huge file. If we do want to consider changes, here's what I suggest: * Test whether GCP allows composition to overwrite an existing blob that is also an input to the compose operation. If that works, then we could use that technique in the composeBlobs function. As mentioned above, I doubt that this actually reduces the number of blobs that exist temporarily – since blobs are immutable objects – but it would minimize their lifetime, effectively deleting them as soon as possible. If overwriting doesn't work, then we could achieve a similar effect by deleting intermediate blobs as soon as they are not needed anymore,
[jira] [Comment Edited] (FLINK-34696) GSRecoverableWriterCommitter is generating excessive data blobs
[ https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828412#comment-17828412 ] Galen Warren edited comment on FLINK-34696 at 3/19/24 5:07 PM: --- I think your proposed algorithm is just another way of implementing the "delete intermediate blobs that are no longer necessary as soon as possible" idea we've considered. You accomplish it by overwriting the staging blob on each iteration; a similar effect could be achieved by writing to a new intermediate staging blob on each iteration (as is done now) and deleting the old one right away (which is not done now. instead this deletion occurs shortly thereafter, after the commit succeeds). Aside: I'm not sure whether it's possible to overwrite the existing staging blob like you suggest. The [docs |[Objects: compose | Cloud Storage | Google Cloud|https://cloud.google.com/storage/docs/json_api/v1/objects/compose]] for the compose operation say: {quote}Concatenates a list of existing objects into a new object in the same bucket. The existing source objects are unaffected by this operation {quote} In your proposal, the same staging blob is both the target of the compose operation and one of the input blobs to be composed. That _might_ work, but would have to be tested to see if it's allowed. If it isn't, writing to a new blob and deleting the old one would have essentially the same effect. That's almost certainly what happens behind the scenes anyway, since blobs are immutable. Bigger picture, if you're trying to combine millions of immutable blobs together in one step, 32 at a time, I don't see how you avoid having lots of intermediate composed blobs, one way or another, at least temporarily. The main question would be how quickly ones that are no longer needed are discarded. {quote}To streamline this, we could modify {{GSCommitRecoverable}} to update the list of {{{}componentObjectIds{}}}, allowing the removal of blobs already appended to the {{stagingBlob}} {quote} Not quite following you here; the GSCommitRecoverable is provided as an input to the GSRecoverableWriterCommitter, providing the list of "raw" (i.e. uncomposed) temporary blobs that need to be composed and committed. If you removed those raw blobs from that list as they're added to the staging blob, what would that accomplish? The GSCommitRecoverable provided to the GSRecoverableWriterCommitter isn't persisted anywhere after the commit succeeds or fails, and if the commit were to be retried it would need the complete list anyway. {quote}I notice an other issue with the code - because the storage.compose might throw a StorageException. With the current code this would mean the intermediate composition blobs are not cleaned up. If the code above is implemented, I think there is no longer a need the TTL feature since all necessary blobs should be written to a final blob. Any leftover blobs post-job completion would indicate a failed state. {quote} There is no real way to prevent the orphaning of intermediate blobs in all failure scenarios. Even in your proposed algorithm, the staging blob could be orphaned if the composition process failed partway through. This is what the temp bucket and TTL mechanism are for. It's optional, so you don't have to use it, but yes you would have to keep an eye out for orphaned intermediate blobs via some other mechanism f you choose not to use it. Honestly, I think some of your difficulties are coming from trying to combine so much data together at once vs. doing it along the way, with commits at incremental checkpoints. I was going to suggest you reduce your checkpoint interval, to aggregate more frequently, but obviously that doesn't work if you're not checkpointing at all. I do think the RecoverableWriter mechanism is designed to make writing predictable and repeatable in checkpoint/recovery situations (hence the name); if you're not doing any of that, you may run into some challenges. If you were to use checkpoints, you would aggregate more frequently, meaning fewer intermediate blobs with shorter lifetimes, and you'd be less likely to run up against the 5TB limit, as you would end up with a series of smaller files, written at each checkpoint, vs. one huge file. If we do want to consider changes, here's what I suggest: * Test whether GCP allows composition to overwrite an existing blob that is also an input to the compose operation. If that works, then we could use that technique in the composeBlobs function. As mentioned above, I doubt that this actually reduces the number of blobs that exist temporarily – since blobs are immutable objects – but it would minimize their lifetime, effectively deleting them as soon as possible. If overwriting doesn't work, then we could achieve a similar effect by deleting intermediate blobs as soon as they are not needed anymore,
[jira] [Comment Edited] (FLINK-34696) GSRecoverableWriterCommitter is generating excessive data blobs
[ https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828412#comment-17828412 ] Galen Warren edited comment on FLINK-34696 at 3/19/24 5:06 PM: --- I think your proposed algorithm is just another way of implementing the "delete intermediate blobs that are no longer necessary as soon as possible" idea we've considered. You accomplish it by overwriting the staging blob on each iteration; a similar effect could be achieved by writing to a new intermediate staging blob on each iteration (as is done now) and deleting the old one right away (which is not done now. instead this deletion occurs shortly thereafter, after the commit succeeds). Aside: I'm not sure whether it's possible to overwrite the existing staging blob like you suggest. The [docs |[Objects: compose | Cloud Storage | Google Cloud|https://cloud.google.com/storage/docs/json_api/v1/objects/compose]] for the compose operation say: {quote}Concatenates a list of existing objects into a new object in the same bucket. The existing source objects are unaffected by this operation {quote} In your proposal, the same staging blob is both the target of the compose operation and one of the input blobs to be composed. That _might_ work, but would have to be tested to see if it's allowed. If it isn't, writing to a new blob and deleting the old one would have essentially the same effect. That's almost certainly what happens behind the scenes anyway, since blobs are immutable. Bigger picture, if you're trying to combine millions of immutable blobs together in one step, 32 at a time, I don't see how you avoid having lots of intermediate composed blobs, one way or another, at least temporarily. The main question would be how quickly ones that are no longer needed are discarded. {quote}To streamline this, we could modify {{GSCommitRecoverable}} to update the list of {{{}componentObjectIds{}}}, allowing the removal of blobs already appended to the {{stagingBlob}} {quote} Not quite following you here; the GSCommitRecoverable is provided as an input to the GSRecoverableWriterCommitter, providing the list of "raw" (i.e. uncomposed) temporary blobs that need to be composed and committed. If you removed those raw blobs from that list as they're added to the staging blob, what would that accomplish? The GSCommitRecoverable provided to the GSRecoverableWriterCommitter isn't persisted anywhere after the commit succeeds or fails, and if the commit were to be retried it would need the complete list anyway. {quote}I notice an other issue with the code - because the storage.compose might throw a StorageException. With the current code this would mean the intermediate composition blobs are not cleaned up. If the code above is implemented, I think there is no longer a need the TTL feature since all necessary blobs should be written to a final blob. Any leftover blobs post-job completion would indicate a failed state. {quote} There is no real way to prevent the orphaning of intermediate blobs in all failure scenarios. Even in your proposed algorithm, the staging blob could be orphaned if the composition process failed partway through. This is what the temp bucket and TTL mechanism are for. It's optional, so you don't have to use it, but yes you would have to keep an eye out for orphaned intermediate blobs via some other mechanism f you choose not to use it. Honestly, I think some of your difficulties are coming from trying to combine so much data together at once vs. doing it along the way, with commits at incremental checkpoints. I was going to suggest you reduce your checkpoint interval, to aggregate more frequently, but obviously that doesn't work if you're not checkpointing at all. I do think the RecoverableWriter mechanism is designed to make writing predictable and repeatable in checkpoint/recovery situations (hence the name); if you're not doing any of that, you may run into some challenges. If you were to use checkpoints, you would aggregate more frequently, meaning fewer intermediate blobs with shorter lifetimes, and you'd be less likely to run up against the 5TB limit, as you would end up with a series of smaller files, written at each checkpoint, vs. one huge file. If we do want to consider changes, here's what I suggest: * Test whether GCP allows composition to overwrite an existing blob that is also an input to the compose operation. If that works, then we could use that technique in the composeBlobs function. As mentioned above, I doubt that this actually reduces the number of blobs that exist temporarily – since blobs are immutable objects – but it would minimize their lifetime, effectively deleting them as soon as possible. If overwriting doesn't work, then we could achieve a similar effect by deleting intermediate blobs as soon as they are not needed anymore,
[jira] [Comment Edited] (FLINK-34696) GSRecoverableWriterCommitter is generating excessive data blobs
[ https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828412#comment-17828412 ] Galen Warren edited comment on FLINK-34696 at 3/19/24 3:32 PM: --- I think your proposed algorithm is just another way of implementing the "delete intermediate blobs that are no longer necessary as soon as possible" idea we've considered. You accomplish it by overwriting the staging blob on each iteration; a similar effect could be achieved by writing to a new intermediate staging blob on each iteration (as is done now) and deleting the old one right away (which is not done now. instead this deletion occurs shortly thereafter, after the commit succeeds). Aside: I'm not sure whether it's possible to overwrite the existing staging blob like you suggest. The [docs |[Objects: compose | Cloud Storage | Google Cloud|https://cloud.google.com/storage/docs/json_api/v1/objects/compose]] for the compose operation say: {quote}Concatenates a list of existing objects into a new object in the same bucket. The existing source objects are unaffected by this operation {quote} In your proposal, the same staging blob is both the target of the compose operation and one of the input blobs to be composed. That _might_ work, but would have to be tested to see if it's allowed. If it isn't, writing to a new blob and deleting the old one would have essentially the same effect. That's almost certainly what happens behind the scenes anyway, since blobs are immutable. Bigger picture, if you're trying to combine millions of immutable blobs together in one step, 32 at a time, I don't see how you avoid having lots of intermediate composed blobs, one way or another, at least temporarily. The main question would be how quickly ones that are no longer needed are discarded. {quote}To streamline this, we could modify {{GSCommitRecoverable}} to update the list of {{{}componentObjectIds{}}}, allowing the removal of blobs already appended to the {{stagingBlob}} {quote} Not quite following you here; the GSCommitRecoverable is provided as an input to the GSRecoverableWriterCommitter, providing the list of "raw" (i.e. uncomposed) temporary blobs that need to be composed and committed. If you removed those raw blobs from that list as they're added to the staging blob, what would that accomplish? The GSCommitRecoverable provided to the GSRecoverableWriterCommitter isn't persisted anywhere after the commit succeeds or fails, and if the commit were to be retried it would need the complete list anyway. {quote}I notice an other issue with the code - because the storage.compose might throw a StorageException. With the current code this would mean the intermediate composition blobs are not cleaned up. If the code above is implemented, I think there is no longer a need the TTL feature since all necessary blobs should be written to a final blob. Any leftover blobs post-job completion would indicate a failed state. {quote} There is no real way to prevent the orphaning of intermediate blobs in all failure scenarios. Even in your proposed algorithm, the staging blob could be orphaned if the composition process failed partway through. This is what the temp bucket and TTL mechanism are for. It's optional, so you don't have to use it, but yes you would have to keep an eye out for orphaned intermediate blobs via some other mechanism f you choose not to use it. Honestly, I think some of your difficulties are coming from trying to combine so much data together at once vs. doing it along the way, with commits at incremental checkpoints. I was going to suggest you reduce your checkpoint interval, to aggregate more frequently, but obviously that doesn't work if you're not checkpointing at all. I do think the RecoverableWriter mechanism is designed to make writing predictable and repeatable in checkpoint/recovery situations (hence the name); if you're not doing any of that, you may run into some challenges. If you were to use checkpoints, you would aggregate more frequently, meaning fewer intermediate blobs with shorter lifetimes, and you'd be less likely to run up against the 5TB limit, as you would end up with a series of smaller files, written at each checkpoint, vs. one huge file. If we do want to consider changes, here's what I suggest: * Test whether GCP allows composition to overwrite an existing blob that is also an input to the compose operation. If that works, then we could use that technique in the composeBlobs function. As mentioned above, I doubt that this actually reduces the number of blobs that exist temporarily – since blobs are immutable objects – but it would minimize their lifetime, effectively deleting them as soon as possible. If overwriting doesn't work, then we could achieve a similar effect by deleting intermediate blobs as soon as they are not needed anymore,
[jira] [Commented] (FLINK-34696) GSRecoverableWriterCommitter is generating excessive data blobs
[ https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828412#comment-17828412 ] Galen Warren commented on FLINK-34696: -- I think your proposed algorithm is just another way of implementing the "delete intermediate blobs that are no longer necessary as soon as possible" idea we've considered. You accomplish it by overwriting the staging blob on each iteration; a similar effect could be achieved by writing to a new intermediate staging blob on each iteration (as is done now) and deleting the old one right away (which is not done now. instead this deletion occurs shortly thereafter, after the commit succeeds). Aside: I'm not sure whether it's possible to overwrite the existing staging blob like you suggest. The [docs |[Objects: compose | Cloud Storage | Google Cloud|https://cloud.google.com/storage/docs/json_api/v1/objects/compose]] for the compose operation say: {quote}Concatenates a list of existing objects into a new object in the same bucket. The existing source objects are unaffected by this operation {quote} In your proposal, the same staging blob is both the target of the compose operation and one of the input blobs to be composed. That _might_ work, but would have to be tested to see if it's allowed. If it isn't, writing to a new blob and deleting the old one would have essentially the same effect. That's almost certainly what happens behind the scenes anyway, since blobs are immutable. Bigger picture, if you're trying to combine millions of immutable blobs together in one step, 32 at a time, I don't see how you avoid having lots of intermediate composed blobs, one way or another, at least temporarily. The main question would be how quickly ones that are no longer needed are discarded. {quote}To streamline this, we could modify {{GSCommitRecoverable}} to update the list of {{{}componentObjectIds{}}}, allowing the removal of blobs already appended to the {{stagingBlob}} {quote} Not quite following you here; the GSCommitRecoverable is provided as an input to the GSRecoverableWriterCommitter, providing the list of "raw" (i.e. uncomposed) temporary blobs that need to be composed and committed. If you removed those raw blobs from that list as they're added to the staging blob, what would that accomplish? The GSCommitRecoverable provided to the GSRecoverableWriterCommitter isn't persisted anywhere after the commit succeeds or fails, and if the commit were to be retried it would need the complete list anyway. {quote}I notice an other issue with the code - because the storage.compose might throw a StorageException. With the current code this would mean the intermediate composition blobs are not cleaned up. If the code above is implemented, I think there is no longer a need the TTL feature since all necessary blobs should be written to a final blob. Any leftover blobs post-job completion would indicate a failed state. {quote} There is no real way to prevent the orphaning of intermediate blobs in all failure scenarios. Even in your proposed algorithm, the staging blob could be orphaned if the composition process failed partway through. This is what the temp bucket and TTL mechanism are for. It's optional, so you don't have to use it, but yes you would have to keep an eye out for orphaned intermediate blobs via some other mechanism f you choose not to use it. Honestly, I think some of your difficulties are coming from trying to combine so much data together at once vs. doing it along the way, with commits at incremental checkpoints. I was going to suggest you reduce your checkpoint interval, to aggregate more frequently, but obviously that doesn't work if you're not checkpointing at all. I do think the RecoverableWriter mechanism is designed to make writing predictable and repeatable in checkpoint/recovery situations (hence the name); if you're not doing any of that, you may run into some challenges. If you were to use checkpoints, you would aggregate more frequently, meaning fewer intermediate blobs with shorter lifetimes, and you'd be less likely to run up against the 5TB limit, as you would end up with a series of smaller files, written at each checkpoint, vs. one huge file. If we do want to consider changes, here's what I suggest: * Test whether GCP allows composition to overwrite an existing blob that is also an input to the compose operation. If that works, then we could use that technique in the composeBlobs function. As mentioned above, I doubt that this actually reduces the number of blobs that exist temporarily – since blobs are immutable objects – but it would minimize their lifetime, effectively deleting them as soon as possible. If overwriting doesn't work, then we could achieve a similar effect by deleting intermediate blobs as soon as they are not needed anymore, rather than waiting until the commit succeeds. *
[jira] [Comment Edited] (FLINK-34696) GSRecoverableWriterCommitter is generating excessive data blobs
[ https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828081#comment-17828081 ] Galen Warren edited comment on FLINK-34696 at 3/18/24 7:35 PM: --- Re: {*}Composition{*}: My understanding is that Flink recoverable writers are supposed to follow a few rules, one of which is that data should not be visible in the final destination until a successful commit occurs. From the description of the [RecoverableWriter|[https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html]:|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html] {quote}The RecoverableWriter creates and recovers [{{RecoverableFsDataOutputStream}}|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.html]. It can be used to write data to a file system in a way that the writing can be resumed consistently after a failure and recovery without loss of data or possible duplication of bytes. The streams do not make the files they write to immediately visible, but instead write to temp files or other temporary storage. To publish the data atomically in the end, the stream offers the [{{RecoverableFsDataOutputStream.closeForCommit()}}|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.html#closeForCommit--] method to create a committer that publishes the result. {quote} This atomicity is required, afaik, so writing to the final blob along the way wouldn't really work. (Though, as I mentioned above, I do think it would be possible to improve the the efficiency of the blob composition somewhat in terms of total data storage required in cases where there are lots of files to be composed, even when using intermediate files.) Re: *Optimization of the Recovery Mechanism:* Yes, it would be possible to delete some of the intermediate composed blobs along the way, i.e. any that are not "root" temporary blobs. They are currently deleted at the end of the commit. I suppose that doing it along the way would cause those blobs to live a _slightly_ shorter period of time and not overlap as much. So I understand, which is the issue? Blob storage costs are prorated pretty granularly so I wouldn't think that short window of time would affect costs that much, but I could be wrong about that. Are you running into cost issues, total storage limits, something else (?) during the current bursts? Re: *Handling the 5 TB Blob Limit:* Yes, it would be possible to write two (or more) blobs to stay under this limit when composing. I'm not completely sure what that means in terms of the atomicity requirement, i.e. I don't think there's any way to do multiple atomic writes in GCP, so there would be a possibility that data could appear and then disappear, i.e. in the case that write of final blob #1 succeeded and then the write of final blob #2 failed and both were deleted because of the failed commit. But, not sure what other options exist. It would be interesting to know how the S3 recoverable writer handles similar limits. Re: *Rationale Behind Blob Composition:* see above. Re: *TTL and temporary bucket:* Could you start without a defined TTL on the bucket while gathering the necessary data about the app, and then turn it on later? Or, you can just leave it off entirely, in which case the temp files go into the main bucket, just in a different location. was (Author: galenwarren): Re: {*}Composition{*}: My understanding is that Flink recoverable writers are supposed to follow a few rules, one of which is that data should not be visible in the final destination until a successful commit occurs. From the description of the [RecoverableWriter|[https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html]:|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html] {quote}The RecoverableWriter creates and recovers [{{RecoverableFsDataOutputStream}}|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.html]. It can be used to write data to a file system in a way that the writing can be resumed consistently after a failure and recovery without loss of data or possible duplication of bytes. The streams do not make the files they write to immediately visible, but instead write to temp files or other temporary storage. To publish the data atomically in the end, the stream offers the [{{RecoverableFsDataOutputStream.closeForCommit()}}|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.html#closeForCommit--] method to create a committer that
[jira] [Comment Edited] (FLINK-34696) GSRecoverableWriterCommitter is generating excessive data blobs
[ https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828081#comment-17828081 ] Galen Warren edited comment on FLINK-34696 at 3/18/24 7:21 PM: --- Re: {*}Composition{*}: My understanding is that Flink recoverable writers are supposed to follow a few rules, one of which is that data should not be visible in the final destination until a successful commit occurs. From the description of the [RecoverableWriter|[https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html]:|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html] {quote}The RecoverableWriter creates and recovers [{{RecoverableFsDataOutputStream}}|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.html]. It can be used to write data to a file system in a way that the writing can be resumed consistently after a failure and recovery without loss of data or possible duplication of bytes. The streams do not make the files they write to immediately visible, but instead write to temp files or other temporary storage. To publish the data atomically in the end, the stream offers the [{{RecoverableFsDataOutputStream.closeForCommit()}}|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.html#closeForCommit--] method to create a committer that publishes the result. {quote} This atomicity is required, afaik, so writing to the final blob along the way wouldn't really work. (Though, as I mentioned above, I do think it would be possible to improve the the efficiency of the blob composition somewhat in terms of total data storage required in cases where there are lots of files to be composed, even when using intermediate files.) Re: *Optimization of the Recovery Mechanism:* Yes, it would be possible to delete some of the intermediate composed blobs along the way, i.e. any that are not "root" temporary blobs. They are currently deleted at the end of the commit. I suppose that doing it along the way would cause those blobs to live a _slightly_ shorter period of time and not overlap as much. So I understand, which is the issue? Blob storage costs are prorated pretty granularly so I wouldn't think that short window of time would affect costs that much, but I could be wrong about that. Are you running into cost issues, total storage limits, something else (?) during the current bursts? Re: *Handling the 5 TB Blob Limit:* Yes, it would be possible to write two blobs to stay under this limit when composing. I'm not completely sure what that means in terms of the atomicity requirement, i.e. I don't think there's any way to do multiple atomic writes in GCP, so there would be a possibility that data could appear and then disappear, i.e. in the case that write of final blob #1 succeeded and then the write of final blob #2 failed and both were deleted because of the failed commit. But, not sure what other options exist. It would be interesting to know how the S3 recoverable writer handles similar limits. Re: *Rationale Behind Blob Composition:* see above. Re: *TTL and temporary bucket:* Could you start without a defined TTL on the bucket while gathering the necessary data about the app, and then turn it on later? Or, you can just leave it off entirely, in which case the temp files go into the main bucket, just in a different location. was (Author: galenwarren): Re: {*}Composition{*}: My understanding is that Flink recoverable writers are supposed to follow a few rules, one of which is that data should not be visible in the final destination until a successful commit occurs. From the description of the [RecoverableWriter|[https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html]:|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html] {quote}The RecoverableWriter creates and recovers [{{RecoverableFsDataOutputStream}}|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.html]. It can be used to write data to a file system in a way that the writing can be resumed consistently after a failure and recovery without loss of data or possible duplication of bytes. The streams do not make the files they write to immediately visible, but instead write to temp files or other temporary storage. To publish the data atomically in the end, the stream offers the [{{RecoverableFsDataOutputStream.closeForCommit()}}|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.html#closeForCommit--] method to create a committer that publishes
[jira] [Commented] (FLINK-34696) GSRecoverableWriterCommitter is generating excessive data blobs
[ https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828081#comment-17828081 ] Galen Warren commented on FLINK-34696: -- Re: {*}Composition{*}: My understanding is that Flink recoverable writers are supposed to follow a few rules, one of which is that data should not be visible in the final destination until a successful commit occurs. From the description of the [RecoverableWriter|[https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html]:|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html] {quote}The RecoverableWriter creates and recovers [{{RecoverableFsDataOutputStream}}|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.html]. It can be used to write data to a file system in a way that the writing can be resumed consistently after a failure and recovery without loss of data or possible duplication of bytes. The streams do not make the files they write to immediately visible, but instead write to temp files or other temporary storage. To publish the data atomically in the end, the stream offers the [{{RecoverableFsDataOutputStream.closeForCommit()}}|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.html#closeForCommit--] method to create a committer that publishes the result. {quote} This atomicity is required, afaik, so writing to the final blob along the way wouldn't really work. (Though, as I mentioned above, I do think it would be possible to improve the the efficiency of the blob composition somewhat in terms of total data storage required in cases where there are lots of files to be composed, even when using intermediate files.) Re: *Optimization of the Recovery Mechanism:* Yes, it would be possible to delete some of the intermediate composed blobs along the way, i.e. any that are not "root" intermediate blobs. They are currently deleted at the end of the commit. I suppose that doing it along the way would cause those blobs to live a _slightly_ shorter period of time and not overlap as much. So I understand, which is the issue? Blob storage costs are prorated pretty granularly so I wouldn't think that short window of time would affect costs that much, but I could be wrong about that. Are you running into cost issues, total storage limits, something else (?) during the current bursts? Re: *Handling the 5 TB Blob Limit:* Yes, it would be possible to write two blobs to stay under this limit when composing. I'm not completely sure what that means in terms of the atomicity requirement, i.e. I don't think there's any way to do multiple atomic writes in GCP, so there would be a possibility that data could appear and then disappear, i.e. in the case that write of final blob #1 succeeded and then the write of final blob #2 failed and both were deleted because of the failed commit. But, not sure what other options exist. It would be interesting to know how the S3 recoverable writer handles similar limits. Re: *Rationale Behind Blob Composition:* see above. Re: *TTL and temporary bucket:* Could you start without a defined TTL on the bucket while gathering the necessary data about the app, and then turn it on later? Or, you can just leave it off entirely, in which case the temp files go into the main bucket, just in a different location. > GSRecoverableWriterCommitter is generating excessive data blobs > --- > > Key: FLINK-34696 > URL: https://issues.apache.org/jira/browse/FLINK-34696 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Simon-Shlomo Poil >Priority: Major > > The `composeBlobs` method in > `org.apache.flink.fs.gs.writer.GSRecoverableWriterCommitter` is designed to > merge multiple small blobs into a single large blob using Google Cloud > Storage's compose method. This process is iterative, combining the result > from the previous iteration with 31 new blobs until all blobs are merged. > Upon completion of the composition, the method proceeds to remove the > temporary blobs. > *Issue:* > This methodology results in significant, unnecessary data storage consumption > during the blob composition process, incurring considerable costs due to > Google Cloud Storage pricing models. > *Example to Illustrate the Problem:* > - Initial state: 64 blobs, each 1 GB in size (totaling 64 GB). > - After 1st step: 32 blobs are merged into a single blob, increasing total > storage to 96 GB (64 original + 32 GB new). > - After 2nd step: The newly created 32 GB blob is merged with 31 more blobs, > raising the total to
[jira] [Commented] (FLINK-34696) GSRecoverableWriterCommitter is generating excessive data blobs
[ https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827590#comment-17827590 ] Galen Warren commented on FLINK-34696: -- One more thing, regarding: {quote}To mitigate this problem, we propose modifying the `composeBlobs` method to immediately delete source blobs once they have been successfully combined. This change could significantly reduce data duplication and associated costs. {quote} I'm not sure it would be safe to delete the raw temporary blobs (i.e. the uncomposed ones) until the commit succeeds, because they are referenced in the Recoverable object and would need to be there if a commit were retried. I suppose it would be fine to delete truly intermediate blobs along the way during the composition process, but these are deleted anyway at the end of the commit, so does that buy much? Perhaps it does with very large files. > GSRecoverableWriterCommitter is generating excessive data blobs > --- > > Key: FLINK-34696 > URL: https://issues.apache.org/jira/browse/FLINK-34696 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Simon-Shlomo Poil >Priority: Major > > The `composeBlobs` method in > `org.apache.flink.fs.gs.writer.GSRecoverableWriterCommitter` is designed to > merge multiple small blobs into a single large blob using Google Cloud > Storage's compose method. This process is iterative, combining the result > from the previous iteration with 31 new blobs until all blobs are merged. > Upon completion of the composition, the method proceeds to remove the > temporary blobs. > *Issue:* > This methodology results in significant, unnecessary data storage consumption > during the blob composition process, incurring considerable costs due to > Google Cloud Storage pricing models. > *Example to Illustrate the Problem:* > - Initial state: 64 blobs, each 1 GB in size (totaling 64 GB). > - After 1st step: 32 blobs are merged into a single blob, increasing total > storage to 96 GB (64 original + 32 GB new). > - After 2nd step: The newly created 32 GB blob is merged with 31 more blobs, > raising the total to 159 GB. > - After 3rd step: The final blob is merged, culminating in a total of 223 GB > to combine the original 64 GB of data. This results in an overhead of 159 GB. > *Impact:* > This inefficiency has a profound impact, especially at scale, where terabytes > of data can incur overheads in the petabyte range, leading to unexpectedly > high costs. Additionally, we have observed an increase in storage exceptions > thrown by the Google Storage library, potentially linked to this issue. > *Suggested Solution:* > To mitigate this problem, we propose modifying the `composeBlobs` method to > immediately delete source blobs once they have been successfully combined. > This change could significantly reduce data duplication and associated costs. > However, the implications for data recovery and integrity need careful > consideration to ensure that this optimization does not compromise the > ability to recover data in case of a failure during the composition process. > *Steps to Reproduce:* > 1. Initiate the blob composition process in an environment with a significant > number of blobs (e.g., 64 blobs of 1 GB each). > 2. Observe the temporary increase in data storage as blobs are iteratively > combined. > 3. Note the final amount of data storage used compared to the initial total > size of the blobs. > *Expected Behavior:* > The blob composition process should minimize unnecessary data storage use, > efficiently managing resources to combine blobs without generating excessive > temporary data overhead. > *Actual Behavior:* > The current implementation results in significant temporary increases in data > storage, leading to high costs and potential system instability due to > frequent storage exceptions. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34696) GSRecoverableWriterCommitter is generating excessive data blobs
[ https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827568#comment-17827568 ] Galen Warren edited comment on FLINK-34696 at 3/15/24 5:49 PM: --- No, this just moves the storage of the temporary files to a different bucket so that a lifecycle policy can be applied to them. The intermediate blobs still have to be composed into the final blob either way. And yes there is one extra copy if you use the temp bucket, but the benefit is you won't orphan temporary files (more below). Upon rereading and looking back at the code (it's been a while!), I may have misunderstood the initial issue. There could be a couple things going on here. 1) There could be orphaned temporary blobs sticking around, because they were written at one point and then due to restores from check/savepoints, they were "forgotten". Writing all intermediate files to a temporary bucket and then applying a lifecycle policy to that bucket would address that issue. It does come at the cost of one extra copy operation; in GCP blobs cannot be composed into a different bucket, so using the temporary bucket incurs one extra copy that doesn't occur if the temporary bucket is the same as the final bucket. 2) There could be large numbers of temporary blobs being composed together, i.e. the writing pattern is such that large numbers of writes occur between checkpoints. Since there's a limit to how many blobs can be composed together at one time (32), there's no way to avoid a temporary blob's data being involved in more than compose operation, in principle, but I do think the composition could be optimized differently. I think #2 is the issue here, not #1? Though one should also be careful about orphaned blobs if state is ever restored from check/savepoints. Regarding optimization: Currently, the composing of blobs occurs [here|https://github.com/apache/flink/blob/f6e1b493bd6292a87efd130a0e76af8bd750c1c9/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java#L131]. It does it in a way that minimizes the number of compose operations but not necessarily the total volume of data involved in those compose operations, which I think is the issue here. Consider the situation where there are 94 temporary blobs to be composed into a committed blob. As it stands now, the first 32 would be committed into 1 blob, leaving 1 + (94 - 32) = 63 blobs remaining. This process would be repeated, composing the first 32 against into 1, leaving 1 + (63 - 32) = 32 blobs. Then these remaining 32 blobs would be composed into 1 final blob. So, 3 total compose operations. This could be done another way – the first 32 blobs could be composed into 1 blob, the second 32 blobs could be composed into 1 blob, and the third 30 blobs could be composed into 1 blob, resulting in three intermediate blobs that would then be composed into the final blob. So, 4 total compose operations in this case. Still potentially recursive with large numbers of blobs due to the 32-blob limit, but this would avoid the piling up of data in the first blob when there are large numbers of temporary blobs. Even though the second method would generally involve more total compose operations than the first, it would minimize the total bytes being composed. I doubt the difference would be significant for small numbers of temporary blobs and could help with large numbers of temporary blobs. That would seem like a reasonable enhancement to me. was (Author: galenwarren): No, this just moves the storage of the temporary files to a different bucket so that a lifecycle policy can be applied to them. The intermediate blobs still have to be composed into the final blob either way. And yes there is one extra copy if you use the temp bucket, but the benefit is you won't orphan temporary files (more below). Upon rereading and looking back at the code (it's been a while!), I may have misunderstood the initial issue. There could be a couple things going on here. 1) There could be orphaned temporary blobs sticking around, because they were written at one point and then due to restores from check/savepoints, they were "forgotten". Writing all intermediate files to a temporary bucket and then applying a lifecycle policy to that bucket would address that issue. It does come at the cost of one extra copy operation; in GCP blobs cannot be composed into a different bucket, so using the temporary bucket incurs one extra copy that doesn't occur if the temporary bucket is the same as the final bucket. 2) There could be large numbers of temporary blobs being composed together, i.e. the writing pattern is such that large numbers of writes occur between checkpoints. Since there's a limit to how many blobs can be composed together at one time (32), there's no way to avoid a temporary
[jira] [Commented] (FLINK-34696) GSRecoverableWriterCommitter is generating excessive data blobs
[ https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827568#comment-17827568 ] Galen Warren commented on FLINK-34696: -- No, this just moves the storage of the temporary files to a different bucket so that a lifecycle policy can be applied to them. The intermediate blobs still have to be composed into the final blob either way. And yes there is one extra copy if you use the temp bucket, but the benefit is you won't orphan temporary files (more below). Upon rereading and looking back at the code (it's been a while!), I may have misunderstood the initial issue. There could be a couple things going on here. 1) There could be orphaned temporary blobs sticking around, because they were written at one point and then due to restores from check/savepoints, they were "forgotten". Writing all intermediate files to a temporary bucket and then applying a lifecycle policy to that bucket would address that issue. It does come at the cost of one extra copy operation; in GCP blobs cannot be composed into a different bucket, so using the temporary bucket incurs one extra copy that doesn't occur if the temporary bucket is the same as the final bucket. 2) There could be large numbers of temporary blobs being composed together, i.e. the writing pattern is such that large numbers of writes occur between checkpoints. Since there's a limit to how many blobs can be composed together at one time (32), there's no way to avoid a temporary blob's data being involved in more than compose operation, in principle, but I do think the composition could be optimized differently. I think #2 is the issue here, not #1? Though one should also be careful about orphaned blobs if state is ever restored from check/savepoints. Regarding optimization: Currently, the composing of blobs occurs [here|[https://github.com/apache/flink/blob/f6e1b493bd6292a87efd130a0e76af8bd750c1c9/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java#L131]]. It does it in a way that minimizes the number of compose operations but not necessarily the total volume of data involved in those compose operations, which I think is the issue here. Consider the situation where there are 94 temporary blobs to be composed into a committed blob. As it stands now, the first 32 would be committed into 1 blob, leaving 1 + (94 - 32) = 63 blobs remaining. This process would be repeated, composing the first 32 against into 1, leaving 1 + (63 - 32) = 32 blobs. Then these remaining 32 blobs would be composed into 1 final blob. So, 3 total compose operations. This could be done another way – the first 32 blobs could be composed into 1 blob, the second 32 blobs could be composed into 1 blob, and the third 30 blobs could be composed into 1 blob, resulting in three intermediate blobs that would then be composed into the final blob. So, 4 total compose operations in this case. Still potentially recursive with large numbers of blobs due to the 32-blob limit, but this would avoid the piling up of data in the first blob when there are large numbers of temporary blobs. Even though the second method would generally involve more total compose operations than the first, it would minimize the total bytes being composed. I doubt the difference would be significant for small numbers of temporary blobs and could help with large numbers of temporary blobs. That would seem like a reasonable enhancement to me. > GSRecoverableWriterCommitter is generating excessive data blobs > --- > > Key: FLINK-34696 > URL: https://issues.apache.org/jira/browse/FLINK-34696 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Simon-Shlomo Poil >Priority: Major > > The `composeBlobs` method in > `org.apache.flink.fs.gs.writer.GSRecoverableWriterCommitter` is designed to > merge multiple small blobs into a single large blob using Google Cloud > Storage's compose method. This process is iterative, combining the result > from the previous iteration with 31 new blobs until all blobs are merged. > Upon completion of the composition, the method proceeds to remove the > temporary blobs. > *Issue:* > This methodology results in significant, unnecessary data storage consumption > during the blob composition process, incurring considerable costs due to > Google Cloud Storage pricing models. > *Example to Illustrate the Problem:* > - Initial state: 64 blobs, each 1 GB in size (totaling 64 GB). > - After 1st step: 32 blobs are merged into a single blob, increasing total > storage to 96 GB (64 original + 32 GB new). > - After 2nd step: The newly created 32 GB blob is merged with 31 more blobs, > raising the total
[jira] [Commented] (FLINK-34696) GSRecoverableWriterCommitter is generating excessive data blobs
[ https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827528#comment-17827528 ] Galen Warren commented on FLINK-34696: -- Yes, the issue is recoverability. However, there is one thing you can do. Create a separate bucket in GCP for temporary files and initialize the filesystem configuration (via FileSystem.initialize) with *_[gs.writer.temporary.bucket.name|http://gs.writer.temporary.bucket.name/]_* set to the name of that bucket. This will cause the GSRecoverableWriter to write intermediate/temporary files to that bucket instead of the "real" bucket. Then, you can apply a TTL[ lifecycle policy |https://cloud.google.com/storage/docs/lifecycle]to the temporary bucket to have files be deleted after whatever TTL you want. If you try to recover to a check/savepoint farther back in time than that TTL interval, the recovery will probably fail, but this will let you dial in whatever recoverability period you want, i.e. longer (at higher storage cost) or shorter (at lower storage cost). > GSRecoverableWriterCommitter is generating excessive data blobs > --- > > Key: FLINK-34696 > URL: https://issues.apache.org/jira/browse/FLINK-34696 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Reporter: Simon-Shlomo Poil >Priority: Major > > The `composeBlobs` method in > `org.apache.flink.fs.gs.writer.GSRecoverableWriterCommitter` is designed to > merge multiple small blobs into a single large blob using Google Cloud > Storage's compose method. This process is iterative, combining the result > from the previous iteration with 31 new blobs until all blobs are merged. > Upon completion of the composition, the method proceeds to remove the > temporary blobs. > *Issue:* > This methodology results in significant, unnecessary data storage consumption > during the blob composition process, incurring considerable costs due to > Google Cloud Storage pricing models. > *Example to Illustrate the Problem:* > - Initial state: 64 blobs, each 1 GB in size (totaling 64 GB). > - After 1st step: 32 blobs are merged into a single blob, increasing total > storage to 96 GB (64 original + 32 GB new). > - After 2nd step: The newly created 32 GB blob is merged with 31 more blobs, > raising the total to 159 GB. > - After 3rd step: The final blob is merged, culminating in a total of 223 GB > to combine the original 64 GB of data. This results in an overhead of 159 GB. > *Impact:* > This inefficiency has a profound impact, especially at scale, where terabytes > of data can incur overheads in the petabyte range, leading to unexpectedly > high costs. Additionally, we have observed an increase in storage exceptions > thrown by the Google Storage library, potentially linked to this issue. > *Suggested Solution:* > To mitigate this problem, we propose modifying the `composeBlobs` method to > immediately delete source blobs once they have been successfully combined. > This change could significantly reduce data duplication and associated costs. > However, the implications for data recovery and integrity need careful > consideration to ensure that this optimization does not compromise the > ability to recover data in case of a failure during the composition process. > *Steps to Reproduce:* > 1. Initiate the blob composition process in an environment with a significant > number of blobs (e.g., 64 blobs of 1 GB each). > 2. Observe the temporary increase in data storage as blobs are iteratively > combined. > 3. Note the final amount of data storage used compared to the initial total > size of the blobs. > *Expected Behavior:* > The blob composition process should minimize unnecessary data storage use, > efficiently managing resources to combine blobs without generating excessive > temporary data overhead. > *Actual Behavior:* > The current implementation results in significant temporary increases in data > storage, leading to high costs and potential system instability due to > frequent storage exceptions. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31619) Upgrade Stateful Functions to Flink 1.16.1
[ https://issues.apache.org/jira/browse/FLINK-31619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17754199#comment-17754199 ] Galen Warren commented on FLINK-31619: -- Created pull request: [[FLINK-31619] Upgrade Stateful Functions to Flink 1.16.1 by galenwarren · Pull Request #331 · apache/flink-statefun (github.com)|https://github.com/apache/flink-statefun/pull/331] > Upgrade Stateful Functions to Flink 1.16.1 > -- > > Key: FLINK-31619 > URL: https://issues.apache.org/jira/browse/FLINK-31619 > Project: Flink > Issue Type: Improvement > Components: Stateful Functions >Reporter: nowke >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available > > Upgrade Statefun to use Flink 1.16.1. Could you assign this to me? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25993) Option to disable Kryo.setRegistrationRequired(false)
[ https://issues.apache.org/jira/browse/FLINK-25993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17622344#comment-17622344 ] Galen Warren commented on FLINK-25993: -- [~chesnay] [~diagonalbishop] I found your old discussion thread on this topic: [https://lists.apache.org/thread/0bzro3qmmsqrz5kh7lmx3xwfyj8wl5rk] I'm also interested in this feature, and I have it working in a local fork. Would a PR be welcome here? Thanks. > Option to disable Kryo.setRegistrationRequired(false) > - > > Key: FLINK-25993 > URL: https://issues.apache.org/jira/browse/FLINK-25993 > Project: Flink > Issue Type: New Feature >Affects Versions: 1.14.3 >Reporter: Shane Bishop >Priority: Minor > > I would like to request a mechanism that a Flink library user could use to > optionally disable Kryo.setRegistrationRequired(false). > The motivation is that Kyro.setRegistrationRequired(true) was made the safe > default in [this > commit|https://github.com/EsotericSoftware/kryo/commit/fc7f0cc7037ff1384b4cdac5b7ada287c64f0a00] > (specifically the change was [this > line|https://github.com/EsotericSoftware/kryo/commit/fc7f0cc7037ff1384b4cdac5b7ada287c64f0a00#diff-6d4638ca49aa0d0d9171ff04a0faa22e241f8320fda4a8a12c95853188d055a0R130] > in the commit). This default is overriden in the 1.14.3 Flink release (see > [KryoSerializer.java|https://github.com/apache/flink/blob/release-1.14.3/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L492] > and > [FlinkScalaKryoInstantiator.scala|https://github.com/apache/flink/blob/release-1.14.3/flink-scala/src/main/scala/org/apache/flink/runtime/types/FlinkScalaKryoInstantiator.scala#L46]). > Reference to thread in mailing list: > https://lists.apache.org/thread/0bzro3qmmsqrz5kh7lmx3xwfyj8wl5rk -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27813) java.lang.IllegalStateException: after migration from statefun-3.1.1 to 3.2.0
[ https://issues.apache.org/jira/browse/FLINK-27813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17543637#comment-17543637 ] Galen Warren commented on FLINK-27813: -- Cool! Glad it was something simple. > java.lang.IllegalStateException: after migration from statefun-3.1.1 to 3.2.0 > - > > Key: FLINK-27813 > URL: https://issues.apache.org/jira/browse/FLINK-27813 > Project: Flink > Issue Type: Bug > Components: API / State Processor >Affects Versions: statefun-3.2.0 >Reporter: Oleksandr Kazimirov >Priority: Blocker > Attachments: screenshot-1.png > > > Issue was met after migration from > flink-statefun:3.1.1-java11 > to > flink-statefun:3.2.0-java11 > > {code:java} > ts.execution-paymentManualValidationRequestEgress-egress, Sink: > ua.aval.payments.execution-norkomExecutionRequested-egress, Sink: > ua.aval.payments.execution-shareStatusToManualValidation-egress) (1/1)#15863 > (98a1f6bef9a435ef9d0292fefeb64022) switched from RUNNING to FAILED with > failure cause: java.lang.IllegalStateException: Unable to parse Netty > transport spec.\n\tat > org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyClientFactory.parseTransportSpec(NettyRequestReplyClientFactory.java:56)\n\tat > > org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyClientFactory.createTransportClient(NettyRequestReplyClientFactory.java:39)\n\tat > > org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider.functionOfType(HttpFunctionProvider.java:49)\n\tat > > org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:70)\n\tat > > org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:47)\n\tat > > org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:71)\n\tat > > org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:59)\n\tat > > org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:48)\n\tat > > org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:153)\n\tat > > org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:148)\n\tat > > org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:90)\n\tat > > org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)\n\tat > > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)\n\tat > > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)\n\tat > > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)\n\tat > > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)\n\tat > > org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:180)\n\tat > > org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:86)\n\tat > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)\n\tat > > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)\n\tat > > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)\n\tat > > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)\n\tat > > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)\n\tat > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)\n\tat > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)\n\tat > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)\n\tat > > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)\n\tat > > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)\n\tat > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)\n\tat > org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)\n\tat > java.base/java.lang.Thread.run(Unknown Source)\nCaused by: > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException: > Time interval unit label 'm' does not match any of the recognized units: > DAYS: (d | day | days), HOURS: (h | hour | hours), MINUTES: (min | minute |
[jira] [Commented] (FLINK-27813) java.lang.IllegalStateException: after migration from statefun-3.1.1 to 3.2.0
[ https://issues.apache.org/jira/browse/FLINK-27813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17543099#comment-17543099 ] Galen Warren commented on FLINK-27813: -- Are you using RequestReplyFunctionBuilder to build your statefun job? > java.lang.IllegalStateException: after migration from statefun-3.1.1 to 3.2.0 > - > > Key: FLINK-27813 > URL: https://issues.apache.org/jira/browse/FLINK-27813 > Project: Flink > Issue Type: Bug > Components: API / State Processor >Affects Versions: statefun-3.2.0 >Reporter: Oleksandr >Priority: Blocker > Attachments: screenshot-1.png > > > Issue was met after migration from > flink-statefun:3.1.1-java11 > to > flink-statefun:3.2.0-java11 > > {code:java} > ts.execution-paymentManualValidationRequestEgress-egress, Sink: > ua.aval.payments.execution-norkomExecutionRequested-egress, Sink: > ua.aval.payments.execution-shareStatusToManualValidation-egress) (1/1)#15863 > (98a1f6bef9a435ef9d0292fefeb64022) switched from RUNNING to FAILED with > failure cause: java.lang.IllegalStateException: Unable to parse Netty > transport spec.\n\tat > org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyClientFactory.parseTransportSpec(NettyRequestReplyClientFactory.java:56)\n\tat > > org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyClientFactory.createTransportClient(NettyRequestReplyClientFactory.java:39)\n\tat > > org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider.functionOfType(HttpFunctionProvider.java:49)\n\tat > > org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:70)\n\tat > > org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:47)\n\tat > > org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:71)\n\tat > > org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:59)\n\tat > > org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:48)\n\tat > > org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:153)\n\tat > > org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:148)\n\tat > > org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:90)\n\tat > > org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)\n\tat > > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)\n\tat > > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)\n\tat > > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)\n\tat > > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)\n\tat > > org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:180)\n\tat > > org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:86)\n\tat > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)\n\tat > > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)\n\tat > > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)\n\tat > > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)\n\tat > > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)\n\tat > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)\n\tat > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)\n\tat > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)\n\tat > > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)\n\tat > > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)\n\tat > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)\n\tat > org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)\n\tat > java.base/java.lang.Thread.run(Unknown Source)\nCaused by: > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException: > Time interval unit label 'm' does not match any of the recognized units: > DAYS: (d | day | days), HOURS: (h | hour | hours),
[jira] [Commented] (FLINK-27813) java.lang.IllegalStateException: after migration from statefun-3.1.1 to 3.2.0
[ https://issues.apache.org/jira/browse/FLINK-27813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17542929#comment-17542929 ] Galen Warren commented on FLINK-27813: -- Hmm, not sure. The relevant part of the error message seems to be this: Time interval unit label 'm' does not match any of the recognized units: DAYS: (d | day | days), HOURS: (h | hour | hours), MINUTES: (min | minute | minutes), SECONDS: (s | sec | secs | second | seconds), MILLISECONDS: (ms | milli | millis | millisecond | milliseconds), MICROSECONDS: (µs | micro | micros | microsecond | microseconds), NANOSECONDS: (ns | nano | nanos | nanosecond | nanoseconds) Do you have a time unit in your spec labeled with an 'm'? > java.lang.IllegalStateException: after migration from statefun-3.1.1 to 3.2.0 > - > > Key: FLINK-27813 > URL: https://issues.apache.org/jira/browse/FLINK-27813 > Project: Flink > Issue Type: Bug > Components: API / State Processor >Affects Versions: statefun-3.2.0 >Reporter: Oleksandr >Priority: Blocker > Attachments: screenshot-1.png > > > Issue was met after migration from > flink-statefun:3.1.1-java11 > to > flink-statefun:3.2.0-java8 > > {code:java} > ts.execution-paymentManualValidationRequestEgress-egress, Sink: > ua.aval.payments.execution-norkomExecutionRequested-egress, Sink: > ua.aval.payments.execution-shareStatusToManualValidation-egress) (1/1)#15863 > (98a1f6bef9a435ef9d0292fefeb64022) switched from RUNNING to FAILED with > failure cause: java.lang.IllegalStateException: Unable to parse Netty > transport spec.\n\tat > org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyClientFactory.parseTransportSpec(NettyRequestReplyClientFactory.java:56)\n\tat > > org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyClientFactory.createTransportClient(NettyRequestReplyClientFactory.java:39)\n\tat > > org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider.functionOfType(HttpFunctionProvider.java:49)\n\tat > > org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:70)\n\tat > > org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:47)\n\tat > > org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:71)\n\tat > > org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:59)\n\tat > > org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:48)\n\tat > > org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:153)\n\tat > > org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:148)\n\tat > > org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:90)\n\tat > > org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)\n\tat > > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)\n\tat > > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)\n\tat > > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)\n\tat > > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)\n\tat > > org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:180)\n\tat > > org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:86)\n\tat > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)\n\tat > > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)\n\tat > > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)\n\tat > > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)\n\tat > > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)\n\tat > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)\n\tat > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)\n\tat > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)\n\tat > > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)\n\tat > >
[jira] [Commented] (FLINK-26462) Release Testing: Running Python UDF in different Execution Mode
[ https://issues.apache.org/jira/browse/FLINK-26462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17502469#comment-17502469 ] Galen Warren commented on FLINK-26462: -- I just ran through the testing steps and everything worked for me. Just one issue, I think it might be a typo in the setup steps. This command: $ python setup.py ... yields this error: {code:java} usage: setup.py [global_opts] cmd1 [cmd1_opts] [cmd2 [cmd2_opts] ...] or: setup.py --help [cmd1 cmd2 ...] or: setup.py --help-commands or: setup.py cmd --help error: no commands supplied {code} Running the next line: $ python setpy.py sdist ... succeeds. > Release Testing: Running Python UDF in different Execution Mode > --- > > Key: FLINK-26462 > URL: https://issues.apache.org/jira/browse/FLINK-26462 > Project: Flink > Issue Type: Improvement > Components: API / Python >Affects Versions: 1.15.0 >Reporter: Huang Xingbo >Priority: Blocker > Labels: release-testing > Fix For: 1.15.0 > > > h1. Setup > * Build flink source code and compile source code > {code:bash} > $ cd {flink-source-code} > $ mvn clean install -DskipTests > {code} > * Prepare a Python Virtual Environment > {code:bash} > $ cd flink-python/dev > $ ./lint-python.sh -s basic > $ source .conda/bin/activate > {code} > * Install PyFlink from source code. For more details, you can refer to the > [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink] > {code:bash} > $ cd flink-python/apache-flink-libraries > $ python setup.py sdist > $ pip install dist/*.tar.gz > $ cd .. > $ pip install -r dev/dev-requirements.txt > $ python setup.py > $ python setpy.py sdist > $ pip install dist/*.tar.gz > {code} > h1. Test > * Write a python udf job named demo.py in process mode > {code:python} > from pyflink.table.table_environment import TableEnvironment > from pyflink.table.environment_settings import EnvironmentSettings > from pyflink.table.udf import ScalarFunction, udf > from pyflink.table import DataTypes, expressions as expr > class SubtractOne(ScalarFunction): > def eval(self, i): > return i - 1 > @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], > result_type=DataTypes.BIGINT()) > def add(i, j): > return i + j > def main(): > t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode()) > # process mode ! > > t_env.get_config().get_configuration().set_string("python.execution-mode", > "process") > # optinal values > t_env.get_config().get_configuration().set_string("parallelism.default", > "2") > add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT()) > subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT()) > t = t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', > 'c']) > result = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1)) > print(result.to_pandas()) > if __name__ == '__main__': > main() > {code} > * run the python udf job and watch the result > {code:bash} > $ python demo.py >_c0 c _c2 > 03 11 > 17 21 > 24 31 > {code} > * change the python udf job to multi-thread mode > {code:python} > from pyflink.table.table_environment import TableEnvironment > from pyflink.table.environment_settings import EnvironmentSettings > from pyflink.table.udf import ScalarFunction, udf > from pyflink.table import DataTypes, expressions as expr > class SubtractOne(ScalarFunction): > def eval(self, i): > return i - 1 > @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], > result_type=DataTypes.BIGINT()) > def add(i, j): > return i + j > def main(): > t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode()) > # multi-thread mode > > t_env.get_config().get_configuration().set_string("python.execution-mode", > "multi-thread") > t_env.get_config().get_configuration().set_string("parallelism.default", > "2") > add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT()) > subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT()) > t = t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', > 'c']) > result = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1)) > print(result.to_pandas()) > if __name__ == '__main__': > main() > {code} > * run the python udf job and watch the result > {code:bash} > $ python demo.py >_c0 c _c2 > 03 11 > 17 21 > 24 31 > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26451) Test GCS FileSystem w/ RecoverableWriter manually
Galen Warren created FLINK-26451: Summary: Test GCS FileSystem w/ RecoverableWriter manually Key: FLINK-26451 URL: https://issues.apache.org/jira/browse/FLINK-26451 Project: Flink Issue Type: Improvement Components: FileSystems Reporter: Galen Warren Fix For: 1.15.0 This is a manual testing issue for [FLINK-11838|https://github.com/apache/flink/pull/15599]. The linked issue adds support for a Flink FileSystem backed by Google Cloud Storage (GCS). This FileSystem implementation includes RecoverableWriter support, which allows it to be used with [FileSystem connector|https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/] sinks. Suggested things to test: * Validate writing checkpoints and/or savepoints to a GCS bucket via a *gs://* endpoint * Validate reading from and writing to *gs://* endpoints via FileSystem connector sources and sinks Note that a Google Cloud account is required to access GCS buckets. Instructions for how to provide the necessary credentials are included in the documentation. Documentation: * https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/gcs/ -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-26375) Fix Statefun Golang SDK to return nil from Context.Caller when there is no caller
[ https://issues.apache.org/jira/browse/FLINK-26375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17498194#comment-17498194 ] Galen Warren commented on FLINK-26375: -- PR created: [[FLINK-26375][statefun-golang-sdk] Fix Statefun Golang SDK to return nil from Context.Caller when there is no caller by galenwarren · Pull Request #304 · apache/flink-statefun (github.com)|https://github.com/apache/flink-statefun/pull/304] > Fix Statefun Golang SDK to return nil from Context.Caller when there is no > caller > - > > Key: FLINK-26375 > URL: https://issues.apache.org/jira/browse/FLINK-26375 > Project: Flink > Issue Type: Bug > Components: Stateful Functions >Affects Versions: statefun-3.0.0, statefun-3.1.0, statefun-3.2.0, > statefun-3.1.1 >Reporter: Galen Warren >Priority: Minor > Labels: pull-request-available > Fix For: statefun-3.2.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When a stateful function is invoked from an ingress – i.e. when there is no > upstream function caller -- Context.Caller() should return nil. Currently it > returns a pointer to a zero-value Address. This issue would fix that. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26375) Fix Statefun Golang SDK to return nil from Context.Caller when there is no caller
Galen Warren created FLINK-26375: Summary: Fix Statefun Golang SDK to return nil from Context.Caller when there is no caller Key: FLINK-26375 URL: https://issues.apache.org/jira/browse/FLINK-26375 Project: Flink Issue Type: Bug Components: Stateful Functions Affects Versions: statefun-3.1.1, statefun-3.2.0, statefun-3.1.0, statefun-3.0.0 Reporter: Galen Warren Fix For: statefun-3.2.0 When a stateful function is invoked from an ingress – i.e. when there is no upstream function caller -- Context.Caller() should return nil. Currently it returns a pointer to a zero-value Address. This issue would fix that. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-26340) Add ability in Golang SDK to create new statefun.Context from existing one, but with a new underlying context.Context
[ https://issues.apache.org/jira/browse/FLINK-26340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17497745#comment-17497745 ] Galen Warren commented on FLINK-26340: -- Just a note for anyone who might arrive here – we changed the implementation in the PR to add a top-level DeriveContext function instead of changing the statefun.Context interface to add a WithContext method. Otherwise, the approach is the same. > Add ability in Golang SDK to create new statefun.Context from existing one, > but with a new underlying context.Context > - > > Key: FLINK-26340 > URL: https://issues.apache.org/jira/browse/FLINK-26340 > Project: Flink > Issue Type: Improvement > Components: Stateful Functions >Affects Versions: statefun-3.3.0 >Reporter: Galen Warren >Assignee: Galen Warren >Priority: Minor > Labels: pull-request-available > Original Estimate: 72h > Remaining Estimate: 72h > > In the Golang SDK, statefun.Context embeds the context.Context interface and > is implemented by the statefunContext struct, which embeds a context.Context. > To support common patterns in Golang related to adding values to context, it > would be useful to be able to create a derived statefun.Context that is > equivalent to the original in terms of statefun functionality but which wraps > a different context.Context. > The proposal is to add a: > WithContext(ctx context.Context) statefun.Context > ... method to the statefun.Context interface and implement it on > statefunContext. This method would return the derived statefun context. > This is a breaking change to statefun.Context, but, given its purpose, we do > not expect there to be implementations of this interface outside the Golang > SDK. > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-26340) Add ability in Golang SDK to create new statefun.Context from existing one, but with a new underlying context.Context
[ https://issues.apache.org/jira/browse/FLINK-26340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17496903#comment-17496903 ] Galen Warren commented on FLINK-26340: -- PR created: [[FLINK-26340][statefun-golang-sdk] Add ability in Golang SDK to create new statefun.Context from existing one, but with a new underlying context.Context by galenwarren · Pull Request #303 · apache/flink-statefun (github.com)|https://github.com/apache/flink-statefun/pull/303] > Add ability in Golang SDK to create new statefun.Context from existing one, > but with a new underlying context.Context > - > > Key: FLINK-26340 > URL: https://issues.apache.org/jira/browse/FLINK-26340 > Project: Flink > Issue Type: Improvement > Components: Stateful Functions >Affects Versions: statefun-3.3.0 >Reporter: Galen Warren >Priority: Minor > Labels: pull-request-available > Original Estimate: 72h > Remaining Estimate: 72h > > In the Golang SDK, statefun.Context embeds the context.Context interface and > is implemented by the statefunContext struct, which embeds a context.Context. > To support common patterns in Golang related to adding values to context, it > would be useful to be able to create a derived statefun.Context that is > equivalent to the original in terms of statefun functionality but which wraps > a different context.Context. > The proposal is to add a: > WithContext(ctx context.Context) statefun.Context > ... method to the statefun.Context interface and implement it on > statefunContext. This method would return the derived statefun context. > This is a breaking change to statefun.Context, but, given its purpose, we do > not expect there to be implementations of this interface outside the Golang > SDK. > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26340) Add ability in Golang SDK to create new statefun.Context from existing one, but with a new underlying context.Context
Galen Warren created FLINK-26340: Summary: Add ability in Golang SDK to create new statefun.Context from existing one, but with a new underlying context.Context Key: FLINK-26340 URL: https://issues.apache.org/jira/browse/FLINK-26340 Project: Flink Issue Type: Improvement Components: Stateful Functions Affects Versions: statefun-3.3.0 Reporter: Galen Warren In the Golang SDK, statefun.Context embeds the context.Context interface and is implemented by the statefunContext struct, which embeds a context.Context. To support common patterns in Golang related to adding values to context, it would be useful to be able to create a derived statefun.Context that is equivalent to the original in terms of statefun functionality but which wraps a different context.Context. The proposal is to add a: WithContext(ctx context.Context) statefun.Context ... method to the statefun.Context interface and implement it on statefunContext. This method would return the derived statefun context. This is a breaking change to statefun.Context, but, given its purpose, we do not expect there to be implementations of this interface outside the Golang SDK. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25933) Allow configuring different transports in RequestReplyFunctionBuilder
[ https://issues.apache.org/jira/browse/FLINK-25933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17489697#comment-17489697 ] Galen Warren commented on FLINK-25933: -- PR created: https://github.com/apache/flink-statefun/pull/300. > Allow configuring different transports in RequestReplyFunctionBuilder > - > > Key: FLINK-25933 > URL: https://issues.apache.org/jira/browse/FLINK-25933 > Project: Flink > Issue Type: Improvement > Components: Stateful Functions >Reporter: Igal Shilman >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available > > Currently it is not possible to configure the type of the transport used > while using the data stream integration. > It would be useful to do so. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Closed] (FLINK-25873) Using RocksDB state backend causes java.lang.ClassNotFoundException: org.apache.flink.runtime.state.CheckpointStreamFactory$CheckpointStateOutputStream
[ https://issues.apache.org/jira/browse/FLINK-25873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Galen Warren closed FLINK-25873. Resolution: Not A Problem > Using RocksDB state backend causes java.lang.ClassNotFoundException: > org.apache.flink.runtime.state.CheckpointStreamFactory$CheckpointStateOutputStream > --- > > Key: FLINK-25873 > URL: https://issues.apache.org/jira/browse/FLINK-25873 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.15.0 >Reporter: Galen Warren >Priority: Major > Fix For: 1.15.0 > > > From the current master branch, attempting to use the RocksDb state backend > yields this error when a checkpoint write is attempted: > {quote}{{Caused by: java.lang.NoClassDefFoundError: > org/apache/flink/runtime/state/CheckpointStreamFactory$CheckpointStateOutputStream}} > {{ at > org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy.asyncSnapshot(RocksIncrementalSnapshotStrategy.java:180)}} > {{ at > org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy.asyncSnapshot(RocksIncrementalSnapshotStrategy.java:83)}} > {{ at > org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:80)}} > {{ at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:551)}} > {{ at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:242)}} > {{ at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:170)}} > {{ at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)}} > {{ at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:227)}} > {{ at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:212)}} > {{ at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)}} > {{ at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:647)}} > {{ at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:320)}} > {{ at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$12(StreamTask.java:1251)}} > {{ at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)}} > {{ at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1239)}} > {{ at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1196)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)}} > {{ at >
[jira] [Commented] (FLINK-25873) Using RocksDB state backend causes java.lang.ClassNotFoundException: org.apache.flink.runtime.state.CheckpointStreamFactory$CheckpointStateOutputStream
[ https://issues.apache.org/jira/browse/FLINK-25873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17484241#comment-17484241 ] Galen Warren commented on FLINK-25873: -- Yes, this was a problem on my end. I hadn't removed the Scala suffix from `flink-statebackend-rocksdb` and was pulling in an old/different version. Sorry for the false alarm. This can be closed. > Using RocksDB state backend causes java.lang.ClassNotFoundException: > org.apache.flink.runtime.state.CheckpointStreamFactory$CheckpointStateOutputStream > --- > > Key: FLINK-25873 > URL: https://issues.apache.org/jira/browse/FLINK-25873 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.15.0 >Reporter: Galen Warren >Priority: Major > Fix For: 1.15.0 > > > From the current master branch, attempting to use the RocksDb state backend > yields this error when a checkpoint write is attempted: > {quote}{{Caused by: java.lang.NoClassDefFoundError: > org/apache/flink/runtime/state/CheckpointStreamFactory$CheckpointStateOutputStream}} > {{ at > org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy.asyncSnapshot(RocksIncrementalSnapshotStrategy.java:180)}} > {{ at > org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy.asyncSnapshot(RocksIncrementalSnapshotStrategy.java:83)}} > {{ at > org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:80)}} > {{ at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:551)}} > {{ at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:242)}} > {{ at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:170)}} > {{ at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)}} > {{ at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:227)}} > {{ at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:212)}} > {{ at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)}} > {{ at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:647)}} > {{ at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:320)}} > {{ at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$12(StreamTask.java:1251)}} > {{ at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)}} > {{ at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1239)}} > {{ at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1196)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)}} > {{ at >
[jira] [Commented] (FLINK-25873) Using RocksDB state backend causes java.lang.ClassNotFoundException: org.apache.flink.runtime.state.CheckpointStreamFactory$CheckpointStateOutputStream
[ https://issues.apache.org/jira/browse/FLINK-25873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17484200#comment-17484200 ] Galen Warren commented on FLINK-25873: -- Actually, this may be a caching issue on my side. I'll report back after I investigate some more. > Using RocksDB state backend causes java.lang.ClassNotFoundException: > org.apache.flink.runtime.state.CheckpointStreamFactory$CheckpointStateOutputStream > --- > > Key: FLINK-25873 > URL: https://issues.apache.org/jira/browse/FLINK-25873 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.15.0 >Reporter: Galen Warren >Priority: Major > Fix For: 1.15.0 > > > From the current master branch, attempting to use the RocksDb state backend > yields this error when a checkpoint write is attempted: > {quote}{{Caused by: java.lang.NoClassDefFoundError: > org/apache/flink/runtime/state/CheckpointStreamFactory$CheckpointStateOutputStream}} > {{ at > org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy.asyncSnapshot(RocksIncrementalSnapshotStrategy.java:180)}} > {{ at > org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy.asyncSnapshot(RocksIncrementalSnapshotStrategy.java:83)}} > {{ at > org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:80)}} > {{ at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:551)}} > {{ at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:242)}} > {{ at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:170)}} > {{ at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)}} > {{ at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:227)}} > {{ at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:212)}} > {{ at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)}} > {{ at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:647)}} > {{ at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:320)}} > {{ at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$12(StreamTask.java:1251)}} > {{ at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)}} > {{ at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1239)}} > {{ at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1196)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)}} > {{ at >
[jira] [Commented] (FLINK-25873) Using RocksDB state backend causes java.lang.ClassNotFoundException: org.apache.flink.runtime.state.CheckpointStreamFactory$CheckpointStateOutputStream
[ https://issues.apache.org/jira/browse/FLINK-25873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17483905#comment-17483905 ] Galen Warren commented on FLINK-25873: -- Sorry, link to current location of CheckpointStateOutputStream should be https://github.com/apache/flink/blob/e2f609c92918d669f2a1c0b69184878d691e3097/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateOutputStream.java#L43. > Using RocksDB state backend causes java.lang.ClassNotFoundException: > org.apache.flink.runtime.state.CheckpointStreamFactory$CheckpointStateOutputStream > --- > > Key: FLINK-25873 > URL: https://issues.apache.org/jira/browse/FLINK-25873 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.15.0 >Reporter: Galen Warren >Priority: Major > Fix For: 1.15.0 > > > From the current master branch, attempting to use the RocksDb state backend > yields this error when a checkpoint write is attempted: > {quote}{{Caused by: java.lang.NoClassDefFoundError: > org/apache/flink/runtime/state/CheckpointStreamFactory$CheckpointStateOutputStream}} > {{ at > org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy.asyncSnapshot(RocksIncrementalSnapshotStrategy.java:180)}} > {{ at > org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy.asyncSnapshot(RocksIncrementalSnapshotStrategy.java:83)}} > {{ at > org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:80)}} > {{ at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:551)}} > {{ at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:242)}} > {{ at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:170)}} > {{ at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)}} > {{ at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:227)}} > {{ at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:212)}} > {{ at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)}} > {{ at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:647)}} > {{ at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:320)}} > {{ at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$12(StreamTask.java:1251)}} > {{ at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)}} > {{ at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1239)}} > {{ at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1196)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)}} > {{ at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)}}
[jira] [Created] (FLINK-25873) Using RocksDB state backend causes java.lang.ClassNotFoundException: org.apache.flink.runtime.state.CheckpointStreamFactory$CheckpointStateOutputStream
Galen Warren created FLINK-25873: Summary: Using RocksDB state backend causes java.lang.ClassNotFoundException: org.apache.flink.runtime.state.CheckpointStreamFactory$CheckpointStateOutputStream Key: FLINK-25873 URL: https://issues.apache.org/jira/browse/FLINK-25873 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.15.0 Reporter: Galen Warren Fix For: 1.15.0 >From the current master branch, attempting to use the RocksDb state backend >yields this error when a checkpoint write is attempted: {quote}{{Caused by: java.lang.NoClassDefFoundError: org/apache/flink/runtime/state/CheckpointStreamFactory$CheckpointStateOutputStream}} {{ at org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy.asyncSnapshot(RocksIncrementalSnapshotStrategy.java:180)}} {{ at org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy.asyncSnapshot(RocksIncrementalSnapshotStrategy.java:83)}} {{ at org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:80)}} {{ at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:551)}} {{ at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:242)}} {{ at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:170)}} {{ at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)}} {{ at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:227)}} {{ at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:212)}} {{ at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)}} {{ at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:647)}} {{ at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:320)}} {{ at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$12(StreamTask.java:1251)}} {{ at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)}} {{ at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1239)}} {{ at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1196)}} {{ at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)}} {{ at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)}} {{ at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)}} {{ at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493)}} {{ at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)}} {{ at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)}} {{ at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)}} {{ at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)}} {{ at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)}} {{ at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)}} {{ at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)}} {{ at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)}} {{ at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)}} {{ at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:517)}} {{ at
[jira] [Commented] (FLINK-25577) Update GCS documentation
[ https://issues.apache.org/jira/browse/FLINK-25577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17483803#comment-17483803 ] Galen Warren commented on FLINK-25577: -- [~xtsong] Thanks! I've really enjoyed working with you as well and I appreciate all your help and guidance. I'll keep a lookout for any feedback during the testing period. Enjoy your holiday! > Update GCS documentation > > > Key: FLINK-25577 > URL: https://issues.apache.org/jira/browse/FLINK-25577 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / FileSystem, Documentation >Reporter: David Morávek >Assignee: Galen Warren >Priority: Blocker > Labels: pull-request-available > Fix For: 1.15.0 > > > Update GCS documentation with respect to: > - flink-shaded-hadoop artifacts and hadoop version 2.8.3 that are no longer > supported as of 1.15 > - The recoverable writer new feature > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/gcs/ -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25577) Update GCS documentation
[ https://issues.apache.org/jira/browse/FLINK-25577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17481473#comment-17481473 ] Galen Warren commented on FLINK-25577: -- Here's a second try at a release note: The `flink-gs-fs-hadoop` FileSystem plugin for Google Cloud Storage (GCS) has been introduced. This allows Flink to read data from and write data to GCS via paths with the `gs:://' scheme, and it provides similar functionality for GCS as, for example, the `flink-s3-fs-hadoop` provides for Amazon S3. In particular, this plugin supports the `RecoverableWriter` interface, which allows it to be used with file sinks. Under the hood, the `flink-gs-fs-hadoop` uses Google's `gcs-connector` Hadoop library for basic read/write operations, and it uses Google's `google-cloud-storage` library to implement `RecoverableWriter` functionality. > Update GCS documentation > > > Key: FLINK-25577 > URL: https://issues.apache.org/jira/browse/FLINK-25577 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / FileSystem, Documentation >Reporter: David Morávek >Assignee: Galen Warren >Priority: Blocker > Labels: pull-request-available > Fix For: 1.15.0 > > > Update GCS documentation with respect to: > - flink-shaded-hadoop artifacts and hadoop version 2.8.3 that are no longer > supported as of 1.15 > - The recoverable writer new feature > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/gcs/ -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25790) Support authentication via core-site.xml in GCS FileSystem plugin
[ https://issues.apache.org/jira/browse/FLINK-25790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17481453#comment-17481453 ] Galen Warren commented on FLINK-25790: -- Created PR: https://github.com/apache/flink/pull/18489 > Support authentication via core-site.xml in GCS FileSystem plugin > - > > Key: FLINK-25790 > URL: https://issues.apache.org/jira/browse/FLINK-25790 > Project: Flink > Issue Type: Improvement > Components: FileSystems >Affects Versions: 1.15.0 >Reporter: Galen Warren >Priority: Major > Labels: pull-request-available > > Add support for authentication via core-site.xml to the new GCS FileSystem > connector, recently added via [FLINK-11838] Create RecoverableWriter for GCS > - ASF JIRA (apache.org). > Specifically, make the RecoverableWriter use explicit credentials supplied in > core-site.xml in the "google.cloud.auth.service.account.json.keyfile" > property. Otherwise, it should use implicit credentials, as it already does. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25790) Support authentication via core-site.xml in GCS FileSystem plugin
Galen Warren created FLINK-25790: Summary: Support authentication via core-site.xml in GCS FileSystem plugin Key: FLINK-25790 URL: https://issues.apache.org/jira/browse/FLINK-25790 Project: Flink Issue Type: Improvement Components: FileSystems Affects Versions: 1.15.0 Reporter: Galen Warren Add support for authentication via core-site.xml to the new GCS FileSystem connector, recently added via [FLINK-11838] Create RecoverableWriter for GCS - ASF JIRA (apache.org). Specifically, make the RecoverableWriter use explicit credentials supplied in core-site.xml in the "google.cloud.auth.service.account.json.keyfile" property. Otherwise, it should use implicit credentials, as it already does. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25772) GCS filesystem fails license checker
[ https://issues.apache.org/jira/browse/FLINK-25772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17481128#comment-17481128 ] Galen Warren commented on FLINK-25772: -- I believe this is a duplicate of [FLINK-25758] GCS Filesystem implementation fails on Java 11 tests due to licensing issues - ASF JIRA (apache.org) which we're trying to address via [[FLINK-25758][flink-gs-fs-hadoop] Fix licensing issues by galenwarren · Pull Request #18452 · apache/flink (github.com)|https://github.com/apache/flink/pull/18452]. > GCS filesystem fails license checker > > > Key: FLINK-25772 > URL: https://issues.apache.org/jira/browse/FLINK-25772 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.15.0 >Reporter: Till Rohrmann >Priority: Blocker > Labels: test-stability > > FLINK-11838 made the {{LicenseChecker}} fail with the following exception > when compiling using Java 11. > {code} > 00:30:51,995 ERROR org.apache.flink.tools.ci.licensecheck.JarFileChecker > [] - File '/javax/annotation/security/package.html' in jar > '/tmp/flink-validation-deployment/org/apache/flink/flink-gs-fs-hadoop/1.15-SNAPSHOT/flink-gs-fs-hadoop-1.15-20220121.001624-1.jar' > contains match with forbidden regex 'gnu ?\R?[\s/#]*general > ?\R?[\s/#]*public ?\R?[\s/#]*license'. > 00:30:51,997 ERROR org.apache.flink.tools.ci.licensecheck.JarFileChecker > [] - File '/javax/annotation/package.html' in jar > '/tmp/flink-validation-deployment/org/apache/flink/flink-gs-fs-hadoop/1.15-SNAPSHOT/flink-gs-fs-hadoop-1.15-20220121.001624-1.jar' > contains match with forbidden regex 'gnu ?\R?[\s/#]*general > ?\R?[\s/#]*public ?\R?[\s/#]*license'. > 00:32:17,194 WARN org.apache.flink.tools.ci.licensecheck.LicenseChecker > [] - Found a total of 3 severe license issues > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=29841=logs=946871de-358d-5815-3994-8175615bc253=e0240c62-4570-5d1c-51af-dd63d2093da1 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25577) Update GCS documentation
[ https://issues.apache.org/jira/browse/FLINK-25577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17480732#comment-17480732 ] Galen Warren commented on FLINK-25577: -- [~MartijnVisser] I'm trying to add the release note, but I can't figure out how to set (or view) that field on this ticket. I'm wondering if I have the proper permissions? Sorry if I'm not seeing something obvious. I was thinking this for the content of the release note: {quote}Add the flink-gs-fs-hadoop FileSystem plugin for Google Cloud Storage, with RecoverableWriter support. {quote} > Update GCS documentation > > > Key: FLINK-25577 > URL: https://issues.apache.org/jira/browse/FLINK-25577 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / FileSystem, Documentation >Reporter: David Morávek >Assignee: Galen Warren >Priority: Blocker > Labels: pull-request-available > Fix For: 1.15.0 > > > Update GCS documentation with respect to: > - flink-shaded-hadoop artifacts and hadoop version 2.8.3 that are no longer > supported as of 1.15 > - The recoverable writer new feature > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/gcs/ -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25758) GCS Filesystem implementation fails on Java 11 tests due to licensing issues
[ https://issues.apache.org/jira/browse/FLINK-25758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17480728#comment-17480728 ] Galen Warren commented on FLINK-25758: -- PR created: [https://github.com/apache/flink/pull/18452] > GCS Filesystem implementation fails on Java 11 tests due to licensing issues > > > Key: FLINK-25758 > URL: https://issues.apache.org/jira/browse/FLINK-25758 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.15.0 >Reporter: Martijn Visser >Assignee: Galen Warren >Priority: Blocker > Labels: pull-request-available > > {code} > 00:33:45,410 DEBUG org.apache.flink.tools.ci.licensecheck.NoticeFileChecker > [] - Dependency io.netty:netty-common:4.1.51.Final is mentioned in NOTICE > file /__w/2/s/flink-python/src/main/resources/META-INF/NOTICE, but was not > mentioned by the build output as a bundled dependency > 00:33:45,411 ERROR org.apache.flink.tools.ci.licensecheck.NoticeFileChecker > [] - Could not find dependency javax.annotation:javax.annotation-api:1.3.2 > in NOTICE file > /__w/2/s/flink-filesystems/flink-gs-fs-hadoop/src/main/resources/META-INF/NOTICE > 00:33:45,536 INFO org.apache.flink.tools.ci.licensecheck.JarFileChecker > [] - Checking directory /tmp/flink-validation-deployment with a total of > 197 jar files. > 00:34:18,554 ERROR org.apache.flink.tools.ci.licensecheck.JarFileChecker > [] - File '/javax/annotation/security/package.html' in jar > '/tmp/flink-validation-deployment/org/apache/flink/flink-gs-fs-hadoop/1.15-SNAPSHOT/flink-gs-fs-hadoop-1.15-20220122.001944-1.jar' > contains match with forbidden regex 'gnu ?\R?[\s/#]*general > ?\R?[\s/#]*public ?\R?[\s/#]*license'. > 00:34:18,555 ERROR org.apache.flink.tools.ci.licensecheck.JarFileChecker > [] - File '/javax/annotation/package.html' in jar > '/tmp/flink-validation-deployment/org/apache/flink/flink-gs-fs-hadoop/1.15-SNAPSHOT/flink-gs-fs-hadoop-1.15-20220122.001944-1.jar' > contains match with forbidden regex 'gnu ?\R?[\s/#]*general > ?\R?[\s/#]*public ?\R?[\s/#]*license'. > 00:35:46,612 WARN org.apache.flink.tools.ci.licensecheck.LicenseChecker > [] - Found a total of 3 severe license issues > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=29932=logs=946871de-358d-5815-3994-8175615bc253=e0240c62-4570-5d1c-51af-dd63d2093da1 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25758) GCS Filesystem implementation fails on Java 11 tests due to licensing issues
[ https://issues.apache.org/jira/browse/FLINK-25758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17480685#comment-17480685 ] Galen Warren commented on FLINK-25758: -- [~MartijnVisser] PR created: https://github.com/apache/flink/pull/18452 > GCS Filesystem implementation fails on Java 11 tests due to licensing issues > > > Key: FLINK-25758 > URL: https://issues.apache.org/jira/browse/FLINK-25758 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.15.0 >Reporter: Martijn Visser >Assignee: Galen Warren >Priority: Blocker > Labels: pull-request-available > > {code} > 00:33:45,410 DEBUG org.apache.flink.tools.ci.licensecheck.NoticeFileChecker > [] - Dependency io.netty:netty-common:4.1.51.Final is mentioned in NOTICE > file /__w/2/s/flink-python/src/main/resources/META-INF/NOTICE, but was not > mentioned by the build output as a bundled dependency > 00:33:45,411 ERROR org.apache.flink.tools.ci.licensecheck.NoticeFileChecker > [] - Could not find dependency javax.annotation:javax.annotation-api:1.3.2 > in NOTICE file > /__w/2/s/flink-filesystems/flink-gs-fs-hadoop/src/main/resources/META-INF/NOTICE > 00:33:45,536 INFO org.apache.flink.tools.ci.licensecheck.JarFileChecker > [] - Checking directory /tmp/flink-validation-deployment with a total of > 197 jar files. > 00:34:18,554 ERROR org.apache.flink.tools.ci.licensecheck.JarFileChecker > [] - File '/javax/annotation/security/package.html' in jar > '/tmp/flink-validation-deployment/org/apache/flink/flink-gs-fs-hadoop/1.15-SNAPSHOT/flink-gs-fs-hadoop-1.15-20220122.001944-1.jar' > contains match with forbidden regex 'gnu ?\R?[\s/#]*general > ?\R?[\s/#]*public ?\R?[\s/#]*license'. > 00:34:18,555 ERROR org.apache.flink.tools.ci.licensecheck.JarFileChecker > [] - File '/javax/annotation/package.html' in jar > '/tmp/flink-validation-deployment/org/apache/flink/flink-gs-fs-hadoop/1.15-SNAPSHOT/flink-gs-fs-hadoop-1.15-20220122.001944-1.jar' > contains match with forbidden regex 'gnu ?\R?[\s/#]*general > ?\R?[\s/#]*public ?\R?[\s/#]*license'. > 00:35:46,612 WARN org.apache.flink.tools.ci.licensecheck.LicenseChecker > [] - Found a total of 3 severe license issues > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=29932=logs=946871de-358d-5815-3994-8175615bc253=e0240c62-4570-5d1c-51af-dd63d2093da1 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25758) GCS Filesystem implementation fails on Java 11 tests due to licensing issues
[ https://issues.apache.org/jira/browse/FLINK-25758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17480471#comment-17480471 ] Galen Warren commented on FLINK-25758: -- Sure, I'll take a look. Is there a way to run the license check locally? > GCS Filesystem implementation fails on Java 11 tests due to licensing issues > > > Key: FLINK-25758 > URL: https://issues.apache.org/jira/browse/FLINK-25758 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.15.0 >Reporter: Martijn Visser >Assignee: Galen Warren >Priority: Blocker > > {code} > 00:33:45,410 DEBUG org.apache.flink.tools.ci.licensecheck.NoticeFileChecker > [] - Dependency io.netty:netty-common:4.1.51.Final is mentioned in NOTICE > file /__w/2/s/flink-python/src/main/resources/META-INF/NOTICE, but was not > mentioned by the build output as a bundled dependency > 00:33:45,411 ERROR org.apache.flink.tools.ci.licensecheck.NoticeFileChecker > [] - Could not find dependency javax.annotation:javax.annotation-api:1.3.2 > in NOTICE file > /__w/2/s/flink-filesystems/flink-gs-fs-hadoop/src/main/resources/META-INF/NOTICE > 00:33:45,536 INFO org.apache.flink.tools.ci.licensecheck.JarFileChecker > [] - Checking directory /tmp/flink-validation-deployment with a total of > 197 jar files. > 00:34:18,554 ERROR org.apache.flink.tools.ci.licensecheck.JarFileChecker > [] - File '/javax/annotation/security/package.html' in jar > '/tmp/flink-validation-deployment/org/apache/flink/flink-gs-fs-hadoop/1.15-SNAPSHOT/flink-gs-fs-hadoop-1.15-20220122.001944-1.jar' > contains match with forbidden regex 'gnu ?\R?[\s/#]*general > ?\R?[\s/#]*public ?\R?[\s/#]*license'. > 00:34:18,555 ERROR org.apache.flink.tools.ci.licensecheck.JarFileChecker > [] - File '/javax/annotation/package.html' in jar > '/tmp/flink-validation-deployment/org/apache/flink/flink-gs-fs-hadoop/1.15-SNAPSHOT/flink-gs-fs-hadoop-1.15-20220122.001944-1.jar' > contains match with forbidden regex 'gnu ?\R?[\s/#]*general > ?\R?[\s/#]*public ?\R?[\s/#]*license'. > 00:35:46,612 WARN org.apache.flink.tools.ci.licensecheck.LicenseChecker > [] - Found a total of 3 severe license issues > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=29932=logs=946871de-358d-5815-3994-8175615bc253=e0240c62-4570-5d1c-51af-dd63d2093da1 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25577) Update GCS documentation
[ https://issues.apache.org/jira/browse/FLINK-25577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17479647#comment-17479647 ] Galen Warren commented on FLINK-25577: -- [~MartijnVisser] [~dmvk] I've created a PR for this: [https://github.com/apache/flink/pull/18430] There is still a bit more work to do on the docs, but I wanted to get the PR started. Also, I have a question in the PR for someone who knows how the docs work. > Update GCS documentation > > > Key: FLINK-25577 > URL: https://issues.apache.org/jira/browse/FLINK-25577 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / FileSystem, Documentation >Reporter: David Morávek >Assignee: Galen Warren >Priority: Blocker > Labels: pull-request-available > Fix For: 1.15.0 > > > Update GCS documentation with respect to: > - flink-shaded-hadoop artifacts and hadoop version 2.8.3 that are no longer > supported as of 1.15 > - The recoverable writer new feature > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/gcs/ -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17479454#comment-17479454 ] Galen Warren commented on FLINK-11838: -- [~MartijnVisser] I'd say let's handle documentation and release notes on the other one, https://issues.apache.org/jira/browse/FLINK-25577, since this one is now closed. Sorry if this is a dumb question, but what is involved in adding this to the release notes? > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.15.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25577) Update GCS documentation not to use no longer supported flink-shaded-hadoop artifacts
[ https://issues.apache.org/jira/browse/FLINK-25577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17470705#comment-17470705 ] Galen Warren commented on FLINK-25577: -- Why don't we leave it open, just so that it doesn't get lost in the event something goes horribly wrong with the PR that is in progress. But I think that's unlikely. Once we complete that PR, I can come back here and update the status and then it could be closed. What do you think? > Update GCS documentation not to use no longer supported flink-shaded-hadoop > artifacts > - > > Key: FLINK-25577 > URL: https://issues.apache.org/jira/browse/FLINK-25577 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / FileSystem, Documentation >Reporter: David Morávek >Priority: Blocker > Fix For: 1.15.0 > > > The GCS documentation refers to no longer supported artifacts. Also 2.8.3 > version of Hadoop will be no longer supported as of 1.15. > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/gcs/ -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25577) Update GCS documentation not to use no longer supported flink-shaded-hadoop artifacts
[ https://issues.apache.org/jira/browse/FLINK-25577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17470701#comment-17470701 ] Galen Warren commented on FLINK-25577: -- I just wanted to mention that it's likely that a GCS FileSystem plugin will land in the next release, via [FLINK-11838] Create RecoverableWriter for GCS - ASF JIRA (apache.org). This would be similar to the S3 plugins that are documented here: [Amazon S3 | Apache Flink|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/]. There will presumably be similar documentation for the GCS plugin as part of this effort. The PR in progress is here: [[FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support by galenwarren · Pull Request #15599 · apache/flink (github.com)|https://github.com/apache/flink/pull/15599]. > Update GCS documentation not to use no longer supported flink-shaded-hadoop > artifacts > - > > Key: FLINK-25577 > URL: https://issues.apache.org/jira/browse/FLINK-25577 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / FileSystem, Documentation >Reporter: David Morávek >Priority: Blocker > Fix For: 1.15.0 > > > The GCS documentation refers to no longer supported artifacts. Also 2.8.3 > version of Hadoop will be no longer supported as of 1.15. > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/gcs/ -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25197) Using Statefun RequestReplyFunctionBuilder fails with Java 8 date/time type `java.time.Duration` not supported by default: add Module "org.apache.flink.shaded.jackson2
[ https://issues.apache.org/jira/browse/FLINK-25197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17465402#comment-17465402 ] Galen Warren commented on FLINK-25197: -- PR created: https://github.com/apache/flink-statefun/pull/282 On Sun, Dec 26, 2021 at 11:43 AM ASF GitHub Bot (Jira) > Using Statefun RequestReplyFunctionBuilder fails with Java 8 date/time type > `java.time.Duration` not supported by default: add Module > "org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310" > to enable handling > --- > > Key: FLINK-25197 > URL: https://issues.apache.org/jira/browse/FLINK-25197 > Project: Flink > Issue Type: Bug > Components: Stateful Functions >Affects Versions: statefun-3.1.0 >Reporter: Galen Warren >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available > Fix For: statefun-3.1.0 > > > When using RequestReplyFunctionBuilder to build a stateful functions job, the > job fails at runtime with: > Java 8 date/time type `java.time.Duration` not supported by default: add > Module > "org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310" > to enable handling > It appears this is because, in > [RequestReplyFunctionBuilder::transportClientPropertiesAsObjectNode|https://github.com/apache/flink-statefun/blob/b4ba9547b8f0105a28544fd28a5e0433666e9023/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java#L127], > a default instance of ObjectMapper is used to serialize the client > properties, which now include a java.time.Duration. There is a > [StateFunObjectMapper|https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java] > class in the project that has customized serde support, but it is not used > here. > The fix seems to be to: > * Use an instance of StateFunObjectMapper to serialize the client properties > in RequestReplyFunctionBuilder > * Modify StateFunObjectMapper to both serialize and deserialize instances of > java.time.Duration (currently, only deserialization is supported) > I've made these changes locally and it seems to fix the problem. Would you be > interested in a PR? Thanks. > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25197) Using Statefun RequestReplyFunctionBuilder fails with Java 8 date/time type `java.time.Duration` not supported by default: add Module "org.apache.flink.shaded.jackson2
[ https://issues.apache.org/jira/browse/FLINK-25197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17465400#comment-17465400 ] Galen Warren commented on FLINK-25197: -- PR created: https://github.com/apache/flink-statefun/pull/282 > Using Statefun RequestReplyFunctionBuilder fails with Java 8 date/time type > `java.time.Duration` not supported by default: add Module > "org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310" > to enable handling > --- > > Key: FLINK-25197 > URL: https://issues.apache.org/jira/browse/FLINK-25197 > Project: Flink > Issue Type: Bug > Components: Stateful Functions >Affects Versions: statefun-3.1.0 >Reporter: Galen Warren >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available > Fix For: statefun-3.1.0 > > > When using RequestReplyFunctionBuilder to build a stateful functions job, the > job fails at runtime with: > Java 8 date/time type `java.time.Duration` not supported by default: add > Module > "org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310" > to enable handling > It appears this is because, in > [RequestReplyFunctionBuilder::transportClientPropertiesAsObjectNode|https://github.com/apache/flink-statefun/blob/b4ba9547b8f0105a28544fd28a5e0433666e9023/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java#L127], > a default instance of ObjectMapper is used to serialize the client > properties, which now include a java.time.Duration. There is a > [StateFunObjectMapper|https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java] > class in the project that has customized serde support, but it is not used > here. > The fix seems to be to: > * Use an instance of StateFunObjectMapper to serialize the client properties > in RequestReplyFunctionBuilder > * Modify StateFunObjectMapper to both serialize and deserialize instances of > java.time.Duration (currently, only deserialization is supported) > I've made these changes locally and it seems to fix the problem. Would you be > interested in a PR? Thanks. > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25197) Using Statefun RequestReplyFunctionBuilder fails with Java 8 date/time type `java.time.Duration` not supported by default: add Module "org.apache.flink.shaded.jackson2.c
[ https://issues.apache.org/jira/browse/FLINK-25197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Galen Warren updated FLINK-25197: - Description: When using RequestReplyFunctionBuilder to build a stateful functions job, the job fails at runtime with: Java 8 date/time type `java.time.Duration` not supported by default: add Module "org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310" to enable handling It appears this is because, in [RequestReplyFunctionBuilder::transportClientPropertiesAsObjectNode|https://github.com/apache/flink-statefun/blob/b4ba9547b8f0105a28544fd28a5e0433666e9023/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java#L127], a default instance of ObjectMapper is used to serialize the client properties, which now include a java.time.Duration. There is a [StateFunObjectMapper|https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java] class in the project that has customized serde support, but it is not used here. The fix seems to be to: * Use an instance of StateFunObjectMapper to serialize the client properties in RequestReplyFunctionBuilder * Modify StateFunObjectMapper to both serialize and deserialize instances of java.time.Duration (currently, only deserialization is supported) I've made these changes locally and it seems to fix the problem. Would you be interested in a PR? Thanks. was: When using RequestReplyFunctionBuilder to build a stateful functions job, the job fails at runtime with: Java 8 date/time type `java.time.Duration` not supported by default: add Module "org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310" to enable handling It appears this is because, in [RequestReplyFunctionBuilder::transportClientPropertiesAsObjectNode|https://github.com/apache/flink-statefun/blob/b4ba9547b8f0105a28544fd28a5e0433666e9023/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java#L127], a default instance of ObjectMapper is used to serialize the client properties, which now include a java.time.Duration. There is a [StateFunObjectMapper|https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java] class in the project that has customized serde support, but it is not used here. The fix seems to be to: * Use an instance of StateFunObjectMapper to serialize the client properties in RequestReplyFunctionBuilder * Modify StateFunObjecdtMapper to both serialize and deserialize instances of java.time.Duration (currently, only deserialization is supported) I've made these changes locally and it seems to fix the problem. Would you be interested in a PR? Thanks. > Using Statefun RequestReplyFunctionBuilder fails with Java 8 date/time type > `java.time.Duration` not supported by default: add Module > "org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310" > to enable handling > --- > > Key: FLINK-25197 > URL: https://issues.apache.org/jira/browse/FLINK-25197 > Project: Flink > Issue Type: Bug > Components: Stateful Functions >Affects Versions: statefun-3.1.0 >Reporter: Galen Warren >Priority: Major > Fix For: statefun-3.1.0 > > > When using RequestReplyFunctionBuilder to build a stateful functions job, the > job fails at runtime with: > Java 8 date/time type `java.time.Duration` not supported by default: add > Module > "org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310" > to enable handling > It appears this is because, in > [RequestReplyFunctionBuilder::transportClientPropertiesAsObjectNode|https://github.com/apache/flink-statefun/blob/b4ba9547b8f0105a28544fd28a5e0433666e9023/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java#L127], > a default instance of ObjectMapper is used to serialize the client > properties, which now include a java.time.Duration. There is a > [StateFunObjectMapper|https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java] > class in the project that has customized serde support, but it is not used >
[jira] [Created] (FLINK-25197) Using Statefun RequestReplyFunctionBuilder fails with Java 8 date/time type `java.time.Duration` not supported by default: add Module "org.apache.flink.shaded.jackson2.c
Galen Warren created FLINK-25197: Summary: Using Statefun RequestReplyFunctionBuilder fails with Java 8 date/time type `java.time.Duration` not supported by default: add Module "org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310" to enable handling Key: FLINK-25197 URL: https://issues.apache.org/jira/browse/FLINK-25197 Project: Flink Issue Type: Bug Components: Stateful Functions Affects Versions: statefun-3.1.0 Reporter: Galen Warren Fix For: statefun-3.1.0 When using RequestReplyFunctionBuilder to build a stateful functions job, the job fails at runtime with: Java 8 date/time type `java.time.Duration` not supported by default: add Module "org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310" to enable handling It appears this is because, in [RequestReplyFunctionBuilder::transportClientPropertiesAsObjectNode|https://github.com/apache/flink-statefun/blob/b4ba9547b8f0105a28544fd28a5e0433666e9023/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java#L127], a default instance of ObjectMapper is used to serialize the client properties, which now include a java.time.Duration. There is a [StateFunObjectMapper|https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java] class in the project that has customized serde support, but it is not used here. The fix seems to be to: * Use an instance of StateFunObjectMapper to serialize the client properties in RequestReplyFunctionBuilder * Modify StateFunObjecdtMapper to both serialize and deserialize instances of java.time.Duration (currently, only deserialization is supported) I've made these changes locally and it seems to fix the problem. Would you be interested in a PR? Thanks. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17398108#comment-17398108 ] Galen Warren commented on FLINK-11838: -- I know this has taken a long time. I'll have the code in before the upcoming 1.14 freeze. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, stale-assigned, usability > Fix For: 1.14.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22757) Update GCS documentation
[ https://issues.apache.org/jira/browse/FLINK-22757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17350139#comment-17350139 ] Galen Warren commented on FLINK-22757: -- In case it's relevant here, I'm working on a [PR|https://github.com/apache/flink/pull/15599] related to GCS support in Flink. There will be some new documentation as part of this. > Update GCS documentation > > > Key: FLINK-22757 > URL: https://issues.apache.org/jira/browse/FLINK-22757 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Ankush Khanna >Priority: Minor > > Currently, GCS filesystem documentation points to > [https://cloud.google.com/dataproc.] This does cover the correct way to > connect to GCS. > Following from this [blog > post|https://www.ververica.com/blog/getting-started-with-da-platform-on-google-kubernetes-engine] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19481) Add support for a flink native GCS FileSystem
[ https://issues.apache.org/jira/browse/FLINK-19481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17342087#comment-17342087 ] Galen Warren commented on FLINK-19481: -- I wanted to check in here. Should I wait until this question is resolved before proceeding with the PR? Personally, my preference would be to see Flink HadoopFileSystem + GoogleHadoopFileSystem as at least _an_ option for the file system implementation, just because those components seem to be well established. I'm not opposed to an alternate implementation, though, i.e. as has been done with S3. If that's the path we're going down, it might mean some changes for the code in the PR I'm working on, hence the question. > Add support for a flink native GCS FileSystem > - > > Key: FLINK-19481 > URL: https://issues.apache.org/jira/browse/FLINK-19481 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem, FileSystems >Affects Versions: 1.12.0 >Reporter: Ben Augarten >Priority: Minor > Labels: auto-deprioritized-major > > Currently, GCS is supported but only by using the hadoop connector[1] > > The objective of this improvement is to add support for checkpointing to > Google Cloud Storage with the Flink File System, > > This would allow the `gs://` scheme to be used for savepointing and > checkpointing. Long term, it would be nice if we could use the GCS FileSystem > as a source and sink in flink jobs as well. > > Long term, I hope that implementing a flink native GCS FileSystem will > simplify usage of GCS because the hadoop FileSystem ends up bringing in many > unshaded dependencies. > > [1] > [https://github.com/GoogleCloudDataproc/hadoop-connectors|https://github.com/GoogleCloudDataproc/hadoop-connectors)] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19481) Add support for a flink native GCS FileSystem
[ https://issues.apache.org/jira/browse/FLINK-19481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17338418#comment-17338418 ] Galen Warren commented on FLINK-19481: -- Hi all, I'm the author of the other [PR|https://github.com/apache/flink/pull/15599] that relates to Google Cloud Storage. [~xintongsong] has been working with me on this. The main goal of my PR is to add support for the RecoverableWriter interface, so that one can write to GCS via a StreamingFileSink. The file system support goes through the Hadoop stack, as noted above, using Google's [cloud storage connector|https://cloud.google.com/dataproc/docs/concepts/connectors/cloud-storage]. I have not personally had problems using the GCS connector and the Hadoop stack – it seems to write check/savepoints properly. I also use it to write job manager HA data to GCS, which seems to work fine. However, if we do want to support a native implementation in addition to the Hadoop-based one, we could approach it similarly to what has been done for S3, i.e. have a shared base project (flink-gs-fs-base?) and then projects for each of the implementations ( flink-gs-fs-hadoop and flink-gs-fs-native?). The recoverable-writer code could go into the shared project so that both of the implementations could use it (assuming that the native implementation doesn't already have a recoverable-writer implementation). I'll defer to the Flink experts on whether that's a worthwhile effort or not. At this point, from my perspective, it wouldn't be that much work to rework the project structure to support this. > Add support for a flink native GCS FileSystem > - > > Key: FLINK-19481 > URL: https://issues.apache.org/jira/browse/FLINK-19481 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem, FileSystems >Affects Versions: 1.12.0 >Reporter: Ben Augarten >Priority: Minor > Labels: auto-deprioritized-major > > Currently, GCS is supported but only by using the hadoop connector[1] > > The objective of this improvement is to add support for checkpointing to > Google Cloud Storage with the Flink File System, > > This would allow the `gs://` scheme to be used for savepointing and > checkpointing. Long term, it would be nice if we could use the GCS FileSystem > as a source and sink in flink jobs as well. > > Long term, I hope that implementing a flink native GCS FileSystem will > simplify usage of GCS because the hadoop FileSystem ends up bringing in many > unshaded dependencies. > > [1] > [https://github.com/GoogleCloudDataproc/hadoop-connectors|https://github.com/GoogleCloudDataproc/hadoop-connectors)] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17326990#comment-17326990 ] Galen Warren commented on FLINK-11838: -- PR is still active. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, stale-assigned, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17320363#comment-17320363 ] Galen Warren commented on FLINK-11838: -- [~xintongsong], PR created: [https://github.com/apache/flink/pull/15599] > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17316376#comment-17316376 ] Galen Warren commented on FLINK-11838: -- Working on the new PR now ... coming soon. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17314079#comment-17314079 ] Galen Warren edited comment on FLINK-11838 at 4/2/21, 8:54 PM: --- [~xintongsong] – actually, is that the right thing to do, start a new PR? I'm thinking that might be the cleanest way to proceed, since we never really used the old one that I created prematurely, and the info in the old one is out of date, but I'll follow your guidance here on what to do. Old PR is [here|https://github.com/apache/flink/pull/14875] was (Author: galenwarren): [~xintongsong] – actually, is that the right thing to do, start a new PR? I'm thinking that might be the cleanest way to proceed, since we never really used the old one that I created prematurely, and the info in the old one is out of date, but I'll follow your guidance here on what to do. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17314079#comment-17314079 ] Galen Warren commented on FLINK-11838: -- [~xintongsong] – actually, is that the right thing to do, start a new PR? I'm thinking that might be the cleanest way to proceed, since we never really used the old one that I created prematurely, and the info in the old one is out of date, but I'll follow your guidance here on what to do. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17314067#comment-17314067 ] Galen Warren commented on FLINK-11838: -- Hi [~xintongsong] – sorry for the long delay. I plan to create a new PR and upload some code this weekend, my other work has let up a bit and I have some time to look at this. I'll post the link here when it's ready. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17303537#comment-17303537 ] Galen Warren commented on FLINK-11838: -- Not that I'm aware of. Writing checkpoints/savepoints and HA data to GCS buckets works fine as-is, but to use StreamingFileSink, a RecoverableWriter implementation is required. That exists now for S3 but not GCS. I have a local GCS RecoverableWriter implementation I'm using and which seems to be working, and I'm hoping to push the first part of the code for review by [~xintongsong] this weekend. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17300332#comment-17300332 ] Galen Warren commented on FLINK-11838: -- [~BerkayOzturk] No, it's not in a working state. Since that PR, [~xintongsong] and I refined the proposed implementation significantly. I'm hoping to get back to this very soon, I've had some work stuff come up that is taking all my time. Sorry about the delay! > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17295384#comment-17295384 ] Galen Warren commented on FLINK-11838: -- I just wanted to add an update here so this doesn't go stale – I should be able to push the first piece of the code in a PR in about a week, once this work project I'm on dies down a bit. So this is still coming, sorry for the delay. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17288419#comment-17288419 ] Galen Warren commented on FLINK-11838: -- Hey, I just wanted to check in here. Sorry for the delay, some other work stuff has come up and I haven't been able to spend any time here. But I will still complete this, will hopefully get back to it toward the end of this week. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17284896#comment-17284896 ] Galen Warren commented on FLINK-11838: -- Ah, OK – I see. It will take me a couple of days to get things organized, I'll post back here when I have something for you to look at. Thanks. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17284751#comment-17284751 ] Galen Warren commented on FLINK-11838: -- That's fine on "compose on persist", I'll leave it out for now and supply it in a separate commit if we decide to go there. This reminds me of a question I meant to ask, though. When we first started, you had some concern about the size of the original PR and suggested it be broken into parts. The new PR will be roughly the same size – if that's too big, any suggestion on how to break it into pieces? Should the unit tests be a separate commit? > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17284476#comment-17284476 ] Galen Warren commented on FLINK-11838: -- {quote}Maybe for the first step, it's good enough to simply do all the composing and deleting at the end. We can try to optimize it later if we indeed see a performance problem in composing and deleting the temporary blobs. {quote} I'm fine to go either way here. I've already put something together locally that allows for composing both at persist and commit, but it would be simple to revert to just doing it at commit. Maybe you can take a look when we get to the code phase to see what you think? If it's not obvious which is better, I suppose we could also control that – "compose on persist" – via a Flink option. Are you comfortable with the approach now? If so, I'll work on getting the code together in order to update the PR. Thanks! > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17283950#comment-17283950 ] Galen Warren edited comment on FLINK-11838 at 2/12/21, 9:01 PM: One more thought/question. While we can potentially compose temp blobs along the way, to avoid having to compose them all at commit time, it seems to me that we can't safely delete any of the temporary blobs along way, because it's possible that we might restore to a checkpoint prior to some or all of the incremental compose operations having occurred. In that case, we'd have to repeat the compose operations, which means the underlying temp blobs would need to be there. If that's right, then we'd necessarily have to wait to the end to delete the temp blobs. I was wondering, would it be allowable to do any of those delete operations in parallel? The coding [guidelines|https://flink.apache.org/contributing/code-style-and-quality-common.html#concurrency-and-threading] would seem to discourage this, but they don't outright prohibit it, so I thought I'd ask first. EDIT: Nevermind regarding the parallelism question; I didn't notice that the GCS api provides a method to delete several blobs in a single call, I think that will suffice. was (Author: galenwarren): One more thought/question. While we can potentially compose temp blobs along the way, to avoid having to compose them all at commit time, it seems to me that we can't safely delete any of the temporary blobs along way, because it's possible that we might restore to a checkpoint prior to some or all of the incremental compose operations having occurred. In that case, we'd have to repeat the compose operations, which means the underlying temp blobs would need to be there. If that's right, then we'd necessarily have to wait to the end to delete the temp blobs. I was wondering, would it be allowable to do any of those delete operations in parallel? The coding [guidelines|https://flink.apache.org/contributing/code-style-and-quality-common.html#concurrency-and-threading] would seem to discourage this, but they don't outright prohibit it, so I thought I'd ask first. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17283950#comment-17283950 ] Galen Warren commented on FLINK-11838: -- One more thought/question. While we can potentially compose temp blobs along the way, to avoid having to compose them all at commit time, it seems to me that we can't safely delete any of the temporary blobs along way, because it's possible that we might restore to a checkpoint prior to some or all of the incremental compose operations having occurred. In that case, we'd have to repeat the compose operations, which means the underlying temp blobs would need to be there. If that's right, then we'd necessarily have to wait to the end to delete the temp blobs. I was wondering, would it be allowable to do any of those delete operations in parallel? The coding [guidelines|https://flink.apache.org/contributing/code-style-and-quality-common.html#concurrency-and-threading] would seem to discourage this, but they don't outright prohibit it, so I thought I'd ask first. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17283747#comment-17283747 ] Galen Warren commented on FLINK-11838: -- Hi [~xintongsong], sorry for the delay here, I had some other, unrelated work I had to focus on. I like your idea of using {{WriteChannel}} for the uploads, but to close each one when {{RecoverableFsDataOutputStream.persist}} is called, so that we're not relying on {{WriteChannel}} for recoverability. {{WriteChannel}} allows one to control how frequently it flushes data via [setChunkSize|https://googleapis.dev/java/google-cloud-clients/0.90.0-alpha/com/google/cloud/WriteChannel.html#setChunkSize-int-], perhaps we expose this chunk size as a Flink option to give the user some control over the process, i.e. how much memory is used for buffering? It could be optional, not setting it would mean to use the Google default. Yes, it would be straightforward to compose blobs at any point in the process, i.e. on commit and/or at {{persist}} calls along the way. Maybe we compose them on commit (of course) and also whenever {{persist}} is called when there are at least 32 temp blobs to be composed? That way, we spread the compose work out over time but also call {{compose}} as few times as possible, composing as many blobs as possible in each call, which seems like it would be the most efficient. Shall we go with this approach? > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17281769#comment-17281769 ] Galen Warren commented on FLINK-11838: -- Hi [~xintongsong] – a couple more thoughts here. I think I understand why the code in [BlobWriteChannel|https://github.com/googleapis/java-storage/blob/master/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java] and [BaseWriteChannel|https://github.com/googleapis/java-core/blob/master/google-cloud-core/src/main/java/com/google/cloud/BaseWriteChannel.java] is more complicated than I expected; there is a [requirement|https://cloud.google.com/storage/docs/performing-resumable-uploads] in resumable uploads that all the uploaded chunks (except the last one) be a multiple of 256 Kib, and so the writers have to buffer data in order to meet that requirement. The RestorableState then, in general, contains some amount of buffered data, i.e. data that has been written but that has not yet been sent as part of an upload chunk. Also, I came across a bit of a disturbing [thread|https://issuetracker.google.com/issues/137168102] from last year, where a GCS bug was being discussed that, essentially, caused in-progress resumable uploads to disappear and return 410 GONE errors. Such a failure would obviously be a big problem for the Flink use case we're talking about here. That thread claims this bug is fixed as of July 2020; however, it got me thinking about alternative implementations, especially since they provided this guidance: {quote}To retry successfully, catching 500 and 410 errors is required and, as the official documentation recommends ([https://cloud.google.com/storage/docs/json_api/v1/status-codes#410_Gone]), implementing a retry by starting a new session for the upload that received an unsuccessful status code but still needs uploading. {quote} which we would be unable to follow. So here's a different implementation idea. What if we didn't use resumable uploads at all, and instead just uploaded data in a series of temporary blobs, each of which would be uploaded via a normal, nonresumable upload. The [RecoverableFsDatastream|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.html] could buffer written data and write temporary blobs at various points (flush, sync, persist, close, or even write, if the buffered size exceeds some threshold). In this approach, the resumable state would include a list of all the temporary blobs uploaded associated with this write, and on commit the temporary files would be combined into one file and copied to the final blob location. To combine the temp blobs into the final blob, we could use the [compose|https://cloud.google.com/storage/docs/composing-objects] feature of GCS, which allows combining up to 32 blobs into one in a single call, and which could compose an arbitrary number of blobs with a pretty simple algorithm. A few additional benefits of this approach, potentially: * Each temp blob write would be a normal, nonresumable upload with known contents and size at the time of upload, so we could use CRC checks as recommended [here|https://cloud.google.com/storage/docs/hashes-etags]. * We'd sidestep the one-week limitation; the lifetime of temporary blobs could be made indefinite or managed via bucket policy * We would be in full control of the resumable state, so we'd avoid any issues related to Java serialization In this approach, we'd probably need a Flink option to set the maximum size of a temporary blob, which would control how frequently temporary blobs would be created. Tuning this would involve a tradeoff between memory usage and checkpoint size on the Flink side vs. time to commit the writes via the recoverable writer, as I'd expect it to take longer to compose a larger number of smaller temporary blobs together into the final blob than it would a smaller number of larger temporary blobs. Anyway, just a thought – let me know what you think. And enjoy the New Year! :) > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface:
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17281481#comment-17281481 ] Galen Warren commented on FLINK-11838: -- Thanks, that all sounds reasonable. I'll take a look at the REST option. I'm going to be a bit tied up with some work-related stuff for the next couple of days so I might not get back until later in the week, but I'll definitely get back on this soon. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17280587#comment-17280587 ] Galen Warren commented on FLINK-11838: -- Thanks. All of your initial assumptions are correct, though I might add that it is straightforward to serde a RestorableState if Java serialization is allowed (i.e. the object is guaranteed to implement Serializable). But I understand that's not preferred. To answer your questions: *Does "writing a blob to a temporary location" mean that the user always needs to configure a temporary location? How is the temporary location cleaned, say if they're never moved to the committed location?* I was thinking to manage temporary locations through a couple of Flink options: * First, a "prefix" option, which would be a string to be applied in front of the supplied, "final" blob name. This prefix could be default be something like ".inprogress/". * Second, an option for the bucket to use for temp blobs, which would not be required. If not supplied, the same bucket would be used for temp blobs as for the associated final blobs. I was also planning on appending a UUID string to the end of temp blob locations, in order to guarantee their uniqueness during the temp-blob writing phase, in case somehow multiple writes to the same final blob were in progress at the same time. So, with the defaults, a write to a blob {{gs://final_bucket/foo/bar}} would use a temp blob location of, say, {{gs://final_bucket/.inprogress/foo/bar/1157f422-32af-4e32-a797-2a0a05f28ecf}}. The prefix could be configured via the first option; also, if the user wanted to write all temp blobs to a "temp_bucket" bucket, that bucket could be specified via the second option, yielding {{gs://temp_bucket/.inprogress/foo/bar/1157f422-32af-4e32-a797-2a0a05f28ecf}}. As for cleaning the temporary blobs, is that what [RecoverableWriter.cleanupRecoverableState|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html#cleanupRecoverableState-org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable-] does? I was thinking to clean up any temporary blobs there. Beyond that, if a separate bucket for temporary blobs were used, I suppose one could apply an [object lifecycle-management rule|https://cloud.google.com/storage/docs/lifecycle] that would delete blobs after some period of time. These rules look to be applicable only at the bucket level, so this would only work if a separate bucket were used just for temporary blobs. *Per [this doc|https://cloud.google.com/storage/docs/resumable-uploads], a resumable upload must be completed within a week. This could be surprising for the users, if they try to restore a job from checkpoints/savepoints after pausing for more than one week.* Yes, I would propose to disclose this limitation via a disclaimer, similar to the one used for S3: {quote}Important Note 2: To guarantee exactly-once semantics while being efficient, the {{StreamingFileSink}} uses the [Multi-part Upload|https://docs.aws.amazon.com/AmazonS3/latest/dev/mpuoverview.html] feature of S3 (MPU from now on). This feature allows to upload files in independent chunks (thus the "multi-part") which can be combined into the original file when all the parts of the MPU are successfully uploaded. For inactive MPUs, S3 supports a bucket lifecycle rule that the user can use to abort multipart uploads that don't complete within a specified number of days after being initiated. This implies that if you set this rule aggressively and take a savepoint with some part-files being not fully uploaded, their associated MPUs may time-out before the job is restarted. This will result in your job not being able to restore from that savepoint as the pending part-files are no longer there and Flink will fail with an exception as it tries to fetch them and fails. {quote} Admittedly, it does seem that S3 provides more configuration options here than GCS. It would be nice if the week limit were configurable, but it doesn't seem to be, based on my read. *Relying on Java serialization means depending our compatibility on the compatibility of GCS, which should be avoid if possible. Would it be possible to directly work with the REST API and session URI? IIUC this is how the write channel internally works.* I'd need to look into it more closely, but yes, I think this could be possible. I think we'd wind up reimplementing much of what is done in [BlobWriteChannel|https://github.com/googleapis/java-storage/blob/master/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java] and [BaseWriteChannel|https://github.com/googleapis/java-core/blob/master/google-cloud-core/src/main/java/com/google/cloud/BaseWriteChannel.java], which I suppose I was thinking would be good to avoid. The code in the BlobWriteChannel
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17279694#comment-17279694 ] Galen Warren commented on FLINK-11838: -- Thanks [~xtsong]! I really appreciate your help on this. Sorry if I jumped the gun on the PR, I'm happy to follow the process you've outlined, and if it makes more sense to ultimately submit the PR in multiple pieces instead of one, that's fine with me too. So it would seem that the next step would be to discuss the proposed design here. I'll take a crack at that. At a high level, the goal here is to implement RecoverableWriter for GCS, in order to be able to use StreamingFileSink to write to GCS. In Flink, recoverable writers are created by calling [FileSystem.createRecoverableWriter|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/FileSystem.html#createRecoverableWriter--], so we will also need an implementation of org.apache.flink.core.fs.FileSystem for GCS in order to expose the recoverable writer to Flink. The org.apache.flink.core.fs.FileSystem implementation is straightforward. Google provides an implementation of org.apache.hadoop.fs.FileSystem for GCS via [GoogleHadoopFileSystem|https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystem.java], which can already be used in Flink in other contexts, e.g. for checkpoints. Flink provides [HadoopFileSystem|https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.html], which implements org.apache.flink.core.fs.FileSystem in terms of an underlying org.apache.hadoop.fs.FileSystem. So, the proposed FileSystem for GCS would extend HadoopFileSystem, constructing it with an instance of GoogleHadoopFileSystem. This new FileSystem class would also override createRecoverableWriter to return a RecoverableWriter implementation for GCS. We also need an implementation of [FileSystemFactory|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/FileSystemFactory.html] and register it with SPI to expose the GCS FileSystem to Flink. So, next is the interesting part – the GCS [RecoverableWriter|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html] implementation. At a high level, RecoverableWriter allows one to create an output stream, write bytes to that stream, persist and recover the state of that stream, and ultimately commit the write operation or abort it. In GCS, I propose to do recoverable writes in two steps: * First, write a blob to a temporary location using the [resumable uploads|https://cloud.google.com/storage/docs/resumable-uploads] feature of GCS. This allows for blobs to be written in chunks over a potentially long period of time (up to one week, per the docs). Importantly, it also allows for the write state to be persisted and recovered, via [WriteChannel.capture|https://googleapis.dev/java/google-cloud-clients/0.90.0-alpha/com/google/cloud/WriteChannel.html#capture--] and [RestorableState.restore|https://googleapis.dev/java/google-cloud-clients/0.90.0-alpha/com/google/cloud/RestorableState.html], which we'll use to implement the persist/recover functionality in RecoverableWriter. * Second, commit the write operation by copying the temporary blob to the "final" blob location, i.e. the one specified in the initial call to RecoverableWriter.open, and deleting the temporary blob. In the event of an aborted upload, the cleanup would consist of deleting just the temporary blob. So, in this approach, the recoverable state from Flink's perspective (i.e. CommitRecoverable and ResumeRecoverable) would include: * The RestorableState returned from WriteChannel * The write position (we have to keep track of this ourselves, because WriteChannel doesn't expose this) * The locations of the temporary and final blobs, so that we can ultimately commit or cleanup the overall operation That's basically it at a high level. I do want to point out one possible conflict with the Flink coding guidelines, though, to get your thoughts. The guidelines say – very emphatically! – not to use Java serialization for anything. In the GCS case, the RestorableState that is returned from WriteChannel.capture is an object that implements Serializable but is otherwise opaque. This object does need to be serialized/deserialized as part of the RecoverableWriter implementation, and it's not clear to me how to do that except by using Java serialization. I'll stop there for now, please let me know if this is the sort of information you're looking for here. I'm happy to drill into any area in more detail if you'd like, just let me know.
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17279023#comment-17279023 ] Galen Warren commented on FLINK-11838: -- I have submitted a new [PR|https://github.com/apache/flink/pull/14875] for this, unrelated to the prior ones, and [~trohrmann] suggested I reuse this Jira ticket. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Fokko Driesprong >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18810) Golang remote functions SDK
[ https://issues.apache.org/jira/browse/FLINK-18810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17201123#comment-17201123 ] Galen Warren commented on FLINK-18810: -- I'd also love to see a Golang SDK and would be willing to contribute here. Will check out [https://github.com/sjwiesman/statefun-go] ... > Golang remote functions SDK > --- > > Key: FLINK-18810 > URL: https://issues.apache.org/jira/browse/FLINK-18810 > Project: Flink > Issue Type: New Feature > Components: Stateful Functions >Reporter: Francesco Guardiani >Priority: Trivial > > Hi, > I was wondering if there's already some WIP for a Golang SDK to create remote > functions. If not, I'm willing to give it a try. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19176) Support ScalaPB as a message payload serializer in Stateful Functions
[ https://issues.apache.org/jira/browse/FLINK-19176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17196268#comment-17196268 ] Galen Warren commented on FLINK-19176: -- I just created a PR: [https://github.com/apache/flink-statefun/pull/152]. > Support ScalaPB as a message payload serializer in Stateful Functions > - > > Key: FLINK-19176 > URL: https://issues.apache.org/jira/browse/FLINK-19176 > Project: Flink > Issue Type: Improvement > Components: Stateful Functions >Affects Versions: 2.0.0 >Reporter: Galen Warren >Priority: Major > Labels: pull-request-available > Fix For: statefun-2.1.0 > > > Currently, Stateful Functions supports four options for serialization of > message payloads: > * Protobuf (based on code generated for Java) > * Kryo > * Multilanguage > * Raw > This proposal is to add a fifth option to this list, to support serialization > of message payloads based on [ScalaPB |[https://scalapb.github.io/docs/]]. > This would allow Scala developers to use ScalaPB-generated classes as message > types in Stateful Functions directly as message types, without having to > convert to/from code generated for Java and/or raw byte[] messages. > This would be a simple implementation, as there is already a > [MessagePayloadSerializer > |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]] > interface that is implemented for each of the existing serialization > methods; supporting ScalaPB would require a new class implementing > MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to > select it, by adding a new value to the [MessageFactoryType > |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration. > Plus testing :) > I've done this already locally, the implementation is very similar to the > existing [MessagePayloadSerializerPb > |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation. > It uses a reference to ScalaPB in "provided" scope. > Would you be interested in a pull request to add this? Primary benefit is to > make it easy to use Stateful Functions in a Scala environment. Thanks. > > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19176) Support ScalaPB as a message payload serializer in Stateful Functions
[ https://issues.apache.org/jira/browse/FLINK-19176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17195635#comment-17195635 ] Galen Warren commented on FLINK-19176: -- Thanks! I'll get to work on that. > Support ScalaPB as a message payload serializer in Stateful Functions > - > > Key: FLINK-19176 > URL: https://issues.apache.org/jira/browse/FLINK-19176 > Project: Flink > Issue Type: Improvement > Components: Stateful Functions >Affects Versions: 2.0.0 >Reporter: Galen Warren >Priority: Major > Fix For: statefun-2.1.0 > > > Currently, Stateful Functions supports four options for serialization of > message payloads: > * Protobuf (based on code generated for Java) > * Kryo > * Multilanguage > * Raw > This proposal is to add a fifth option to this list, to support serialization > of message payloads based on [ScalaPB |[https://scalapb.github.io/docs/]]. > This would allow Scala developers to use ScalaPB-generated classes as message > types in Stateful Functions directly as message types, without having to > convert to/from code generated for Java and/or raw byte[] messages. > This would be a simple implementation, as there is already a > [MessagePayloadSerializer > |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]] > interface that is implemented for each of the existing serialization > methods; supporting ScalaPB would require a new class implementing > MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to > select it, by adding a new value to the [MessageFactoryType > |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration. > Plus testing :) > I've done this already locally, the implementation is very similar to the > existing [MessagePayloadSerializerPb > |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation. > It uses a reference to ScalaPB in "provided" scope. > Would you be interested in a pull request to add this? Primary benefit is to > make it easy to use Stateful Functions in a Scala environment. Thanks. > > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19176) Support ScalaPB as a message payload serializer in Stateful Functions
[ https://issues.apache.org/jira/browse/FLINK-19176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17195564#comment-17195564 ] Galen Warren commented on FLINK-19176: -- Thanks [~igal]. Yep, that's almost exactly how I approached it! I thought of MessageFactoryKey as needing to containing two pieces of information – the existing MessageFactoryType (with one new value added to the enumeration, USES_CUSTOM_PAYLOADS) and a string field (customPayloadSerializerName) that contains the name of the custom payload serializer class, which would be used only with USES_CUSTOM_PAYLOADS. However, one question. Several classes that currently have MessageFactoryKey as a field are serializable, and so changing the type of that field would break serialization backward compatibility if nothing else were done. MessageTypeSerializer is an example of such a class. I can see a few options for how to proceed: * Just replace MessageFactoryType with MessageFactoryKey anyway, bump the serialization uid and accept the incompatibility * Replace MessageFactoryType with MessageFactoryKey, bump the serialization uid, and implement custom Java serialization so that the new versions of the classes can deserialize serialized data from the old version of the classes with the different field * Use MessageFactoryKey for parameters everywhere but, in serializable classes that currently contain MessageFactoryKey, leave that MessageFactoryType as a field and also add customPayloadSerializerName as a field. This should allow both the new and old versions of the class to read the same serialized data and not require a bump of the version uid. Does that sound right, and if so, any recommendations on how to proceed? Thanks. > Support ScalaPB as a message payload serializer in Stateful Functions > - > > Key: FLINK-19176 > URL: https://issues.apache.org/jira/browse/FLINK-19176 > Project: Flink > Issue Type: Improvement > Components: Stateful Functions >Affects Versions: 2.0.0 >Reporter: Galen Warren >Priority: Major > Fix For: statefun-2.1.0 > > > Currently, Stateful Functions supports four options for serialization of > message payloads: > * Protobuf (based on code generated for Java) > * Kryo > * Multilanguage > * Raw > This proposal is to add a fifth option to this list, to support serialization > of message payloads based on [ScalaPB |[https://scalapb.github.io/docs/]]. > This would allow Scala developers to use ScalaPB-generated classes as message > types in Stateful Functions directly as message types, without having to > convert to/from code generated for Java and/or raw byte[] messages. > This would be a simple implementation, as there is already a > [MessagePayloadSerializer > |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]] > interface that is implemented for each of the existing serialization > methods; supporting ScalaPB would require a new class implementing > MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to > select it, by adding a new value to the [MessageFactoryType > |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration. > Plus testing :) > I've done this already locally, the implementation is very similar to the > existing [MessagePayloadSerializerPb > |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation. > It uses a reference to ScalaPB in "provided" scope. > Would you be interested in a pull request to add this? Primary benefit is to > make it easy to use Stateful Functions in a Scala environment. Thanks. > > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19176) Support ScalaPB as a message payload serializer in Stateful Functions
[ https://issues.apache.org/jira/browse/FLINK-19176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17194832#comment-17194832 ] Galen Warren commented on FLINK-19176: -- Hi [~igal] - I've forked the project and taken a crack at the approach based on a pluggable MessagePayloadSerializer. It seems to work in my project. I haven't completely run through the tape yet on unit tests; before I do, I thought I'd check if you (or someone) would like to validate the approach. Should I create a pull request? Thanks, Galen > Support ScalaPB as a message payload serializer in Stateful Functions > - > > Key: FLINK-19176 > URL: https://issues.apache.org/jira/browse/FLINK-19176 > Project: Flink > Issue Type: Improvement > Components: Stateful Functions >Affects Versions: 2.0.0 >Reporter: Galen Warren >Priority: Major > Fix For: statefun-2.1.0 > > > Currently, Stateful Functions supports four options for serialization of > message payloads: > * Protobuf (based on code generated for Java) > * Kryo > * Multilanguage > * Raw > This proposal is to add a fifth option to this list, to support serialization > of message payloads based on [ScalaPB |[https://scalapb.github.io/docs/]]. > This would allow Scala developers to use ScalaPB-generated classes as message > types in Stateful Functions directly as message types, without having to > convert to/from code generated for Java and/or raw byte[] messages. > This would be a simple implementation, as there is already a > [MessagePayloadSerializer > |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]] > interface that is implemented for each of the existing serialization > methods; supporting ScalaPB would require a new class implementing > MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to > select it, by adding a new value to the [MessageFactoryType > |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration. > Plus testing :) > I've done this already locally, the implementation is very similar to the > existing [MessagePayloadSerializerPb > |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation. > It uses a reference to ScalaPB in "provided" scope. > Would you be interested in a pull request to add this? Primary benefit is to > make it easy to use Stateful Functions in a Scala environment. Thanks. > > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19176) Support ScalaPB as a message payload serializer in Stateful Functions
[ https://issues.apache.org/jira/browse/FLINK-19176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17194243#comment-17194243 ] Galen Warren commented on FLINK-19176: -- Thanks for the quick reply! A few comments/questions: {quote}Unfortunately adding ScalaPB payload serializer to the project directly (even with runtime scope) will be very hard to get a consensus for, since maintaining different Scala version is problematic and we have experienced that in Flink in many different instances. {quote} Obviously, I'll defer to your judgment as to what could achieve consensus here. It does look to me that StateFun already has a Scala dependency; the additional ScalaPB dependency that would be added here (in statefun-flink-core) would just reference the existing Scala dependency defined in statefun-parent, i.e.: {noformat} com.thesamet.scalapb scalapb-runtime_${scala.binary.version} ${scalapb.version} provided {noformat} So it wouldn't be adding a new Scala dependency, but perhaps even referencing the existing Scala dependency is undesirable. I do have a working version of this locally, I'd be happy to share the code with you, if you want to see exactly what it might look like. {quote}I think that the long term solution should be a Scala native remote SDK, and I would be perfectly happy to help kicking off this effort if you are interested :) {quote} This sounds interesting but I don't quite understand what this would entail, would you mind elaborating a bit? {quote}Meanwhile, if you would like to use ScalaPB for the embedded functions, then we would need to support pluggable MessagePayloadSerializer, that would be picked up in runtime from the class-path. If you are interesting on working on that, I can try to outline the steps needed to do that. {quote} This sounds promising to me! Are you suggesting that, instead of having to choose among the predefined serializers (protobuf, kryo, etc.), one could alternatively provide the name of a MessagePayloadSerializer class – i.e. com.mydomain.MySerializer – in the StateFun configuration, and then MessageFactory would create an instance of that class when serializers are needed? That sounds like a great option to me. I'd be happy to work on that and would appreciate any direction you could provide. Thanks. > Support ScalaPB as a message payload serializer in Stateful Functions > - > > Key: FLINK-19176 > URL: https://issues.apache.org/jira/browse/FLINK-19176 > Project: Flink > Issue Type: Improvement > Components: Stateful Functions >Affects Versions: 2.0.0 >Reporter: Galen Warren >Priority: Major > Fix For: statefun-2.1.0 > > > Currently, Stateful Functions supports four options for serialization of > message payloads: > * Protobuf (based on code generated for Java) > * Kryo > * Multilanguage > * Raw > This proposal is to add a fifth option to this list, to support serialization > of message payloads based on [ScalaPB |[https://scalapb.github.io/docs/]]. > This would allow Scala developers to use ScalaPB-generated classes as message > types in Stateful Functions directly as message types, without having to > convert to/from code generated for Java and/or raw byte[] messages. > This would be a simple implementation, as there is already a > [MessagePayloadSerializer > |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]] > interface that is implemented for each of the existing serialization > methods; supporting ScalaPB would require a new class implementing > MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to > select it, by adding a new value to the [MessageFactoryType > |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration. > Plus testing :) > I've done this already locally, the implementation is very similar to the > existing [MessagePayloadSerializerPb > |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation. > It uses a reference to ScalaPB in "provided" scope. > Would you be interested in a pull request to add this? Primary benefit is to > make it easy to use Stateful Functions in a Scala environment. Thanks. > > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19176) Support ScalaPB as a message payload serializer in Stateful Functions
[ https://issues.apache.org/jira/browse/FLINK-19176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17193217#comment-17193217 ] Galen Warren commented on FLINK-19176: -- Sorry for the edits – I'm trying to get hyperlinks to appear correctly. > Support ScalaPB as a message payload serializer in Stateful Functions > - > > Key: FLINK-19176 > URL: https://issues.apache.org/jira/browse/FLINK-19176 > Project: Flink > Issue Type: Improvement > Components: Stateful Functions >Affects Versions: 2.0.0 >Reporter: Galen Warren >Priority: Major > Fix For: statefun-2.1.0 > > > Currently, Stateful Functions supports four options for serialization of > message payloads: > * Protobuf (based on code generated for Java) > * Kryo > * Multilanguage > * Raw > This proposal is to add a fifth option to this list, to support serialization > of message payloads based on [ScalaPB |[https://scalapb.github.io/docs/]]. > This would allow Scala developers to use ScalaPB-generated classes as message > types in Stateful Functions directly as message types, without having to > convert to/from code generated for Java and/or raw byte[] messages. > This would be a simple implementation, as there is already a > [MessagePayloadSerializer > |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]] > interface that is implemented for each of the existing serialization > methods; supporting ScalaPB would require a new class implementing > MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to > select it, by adding a new value to the [MessageFactoryType > |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration. > Plus testing :) > I've done this already locally, the implementation is very similar to the > existing [MessagePayloadSerializerPb > |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation. > It uses a reference to ScalaPB in "provided" scope. > Would you be interested in a pull request to add this? Primary benefit is to > make it easy to use Stateful Functions in a Scala environment. Thanks. > > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19176) Support ScalaPB as a message payload serializer in Stateful Functions
[ https://issues.apache.org/jira/browse/FLINK-19176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Galen Warren updated FLINK-19176: - Description: Currently, Stateful Functions supports four options for serialization of message payloads: * Protobuf (based on code generated for Java) * Kryo * Multilanguage * Raw This proposal is to add a fifth option to this list, to support serialization of message payloads based on [ScalaPB |[https://scalapb.github.io/docs/]]. This would allow Scala developers to use ScalaPB-generated classes as message types in Stateful Functions directly as message types, without having to convert to/from code generated for Java and/or raw byte[] messages. This would be a simple implementation, as there is already a [MessagePayloadSerializer |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]] interface that is implemented for each of the existing serialization methods; supporting ScalaPB would require a new class implementing MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to select it, by adding a new value to the [MessageFactoryType |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration. Plus testing :) I've done this already locally, the implementation is very similar to the existing [MessagePayloadSerializerPb |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation. It uses a reference to ScalaPB in "provided" scope. Would you be interested in a pull request to add this? Primary benefit is to make it easy to use Stateful Functions in a Scala environment. Thanks. was: Currently, Stateful Functions supports four options for serialization of message payloads: * Protobuf (based on code generated for Java) * Kryo * Multilanguage * Raw This proposal is to add a fifth option to this list, to support serialization of message payloads based on [ScalaPB]([https://scalapb.github.io/docs/]). This would allow Scala developers to use ScalaPB-generated classes as message types in Stateful Functions directly as message types, without having to convert to/from code generated for Java and/or raw byte[] messages. This would be a simple implementation, as there is already a [MessagePayloadSerializer |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]] interface that is implemented for each of the existing serialization methods; supporting ScalaPB would require a new class implementing MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to select it, by adding a new value to the [MessageFactoryType |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration. Plus testing :) I've done this already locally, the implementation is very similar to the existing [MessagePayloadSerializerPb |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation. It uses a reference to ScalaPB in "provided" scope. Would you be interested in a pull request to add this? Primary benefit is to make it easy to use Stateful Functions in a Scala environment. Thanks. > Support ScalaPB as a message payload serializer in Stateful Functions > - > > Key: FLINK-19176 > URL: https://issues.apache.org/jira/browse/FLINK-19176 > Project: Flink > Issue Type: Improvement > Components: Stateful Functions >Affects Versions: 2.0.0 >Reporter: Galen Warren >Priority: Major > Fix For: statefun-2.1.0 > > > Currently, Stateful Functions supports four options for serialization of > message payloads: > * Protobuf (based on code generated for Java) > * Kryo > * Multilanguage > * Raw > This proposal is to add a fifth option to this list, to support serialization > of message payloads based on [ScalaPB |[https://scalapb.github.io/docs/]]. > This would allow Scala developers to use ScalaPB-generated
[jira] [Updated] (FLINK-19176) Support ScalaPB as a message payload serializer in Stateful Functions
[ https://issues.apache.org/jira/browse/FLINK-19176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Galen Warren updated FLINK-19176: - Description: Currently, Stateful Functions supports four options for serialization of message payloads: * Protobuf (based on code generated for Java) * Kryo * Multilanguage * Raw This proposal is to add a fifth option to this list, to support serialization of message payloads based on [ScalaPB]([https://scalapb.github.io/docs/]). This would allow Scala developers to use ScalaPB-generated classes as message types in Stateful Functions directly as message types, without having to convert to/from code generated for Java and/or raw byte[] messages. This would be a simple implementation, as there is already a [MessagePayloadSerializer |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]] interface that is implemented for each of the existing serialization methods; supporting ScalaPB would require a new class implementing MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to select it, by adding a new value to the [MessageFactoryType |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration. Plus testing :) I've done this already locally, the implementation is very similar to the existing [MessagePayloadSerializerPb |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation. It uses a reference to ScalaPB in "provided" scope. Would you be interested in a pull request to add this? Primary benefit is to make it easy to use Stateful Functions in a Scala environment. Thanks. was: Currently, Stateful Functions supports four options for serialization of message payloads: * Protobuf (based on code generated for Java) * Kryo * Multilanguage * Raw This proposal is to add a fifth option to this list, to support serialization of message payloads based on [ScalaPB](|[https://scalapb.github.io/docs/]). This would allow Scala developers to use ScalaPB-generated classes as message types in Stateful Functions directly as message types, without having to convert to/from code generated for Java and/or raw byte[] messages. This would be a simple implementation, as there is already a [MessagePayloadSerializer |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]] interface that is implemented for each of the existing serialization methods; supporting ScalaPB would require a new class implementing MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to select it, by adding a new value to the [MessageFactoryType |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration. Plus testing :) I've done this already locally, the implementation is very similar to the existing [MessagePayloadSerializerPb |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation. It uses a reference to ScalaPB in "provided" scope. Would you be interested in a pull request to add this? Primary benefit is to make it easy to use Stateful Functions in a Scala environment. Thanks. > Support ScalaPB as a message payload serializer in Stateful Functions > - > > Key: FLINK-19176 > URL: https://issues.apache.org/jira/browse/FLINK-19176 > Project: Flink > Issue Type: Improvement > Components: Stateful Functions >Affects Versions: 2.0.0 >Reporter: Galen Warren >Priority: Major > Fix For: statefun-2.1.0 > > > Currently, Stateful Functions supports four options for serialization of > message payloads: > * Protobuf (based on code generated for Java) > * Kryo > * Multilanguage > * Raw > This proposal is to add a fifth option to this list, to support serialization > of message payloads based on [ScalaPB]([https://scalapb.github.io/docs/]). > This would allow Scala developers to use ScalaPB-generated
[jira] [Updated] (FLINK-19176) Support ScalaPB as a message payload serializer in Stateful Functions
[ https://issues.apache.org/jira/browse/FLINK-19176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Galen Warren updated FLINK-19176: - Description: Currently, Stateful Functions supports four options for serialization of message payloads: * Protobuf (based on code generated for Java) * Kryo * Multilanguage * Raw This proposal is to add a fifth option to this list, to support serialization of message payloads based on [ScalaPB](|[https://scalapb.github.io/docs/]). This would allow Scala developers to use ScalaPB-generated classes as message types in Stateful Functions directly as message types, without having to convert to/from code generated for Java and/or raw byte[] messages. This would be a simple implementation, as there is already a [MessagePayloadSerializer |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]] interface that is implemented for each of the existing serialization methods; supporting ScalaPB would require a new class implementing MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to select it, by adding a new value to the [MessageFactoryType |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration. Plus testing :) I've done this already locally, the implementation is very similar to the existing [MessagePayloadSerializerPb |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation. It uses a reference to ScalaPB in "provided" scope. Would you be interested in a pull request to add this? Primary benefit is to make it easy to use Stateful Functions in a Scala environment. Thanks. was: Currently, Stateful Functions supports four options for serialization of message payloads: * Protobuf (based on code generated for Java) * Kryo * Multilanguage * Raw This proposal is to add a fifth option to this list, to support serialization of message payloads based on [ScalaPB|[https://scalapb.github.io/docs/]]. This would allow Scala developers to use ScalaPB-generated classes as message types in Stateful Functions directly as message types, without having to convert to/from code generated for Java and/or raw byte[] messages. This would be a simple implementation, as there is already a [MessagePayloadSerializer |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]] interface that is implemented for each of the existing serialization methods; supporting ScalaPB would require a new class implementing MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to select it, by adding a new value to the [MessageFactoryType |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration. Plus testing :) I've done this already locally, the implementation is very similar to the existing [MessagePayloadSerializerPb |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation. It uses a reference to ScalaPB in "provided" scope. Would you be interested in a pull request to add this? Primary benefit is to make it easy to use Stateful Functions in a Scala environment. Thanks. > Support ScalaPB as a message payload serializer in Stateful Functions > - > > Key: FLINK-19176 > URL: https://issues.apache.org/jira/browse/FLINK-19176 > Project: Flink > Issue Type: Improvement > Components: Stateful Functions >Affects Versions: 2.0.0 >Reporter: Galen Warren >Priority: Major > Fix For: statefun-2.1.0 > > > Currently, Stateful Functions supports four options for serialization of > message payloads: > * Protobuf (based on code generated for Java) > * Kryo > * Multilanguage > * Raw > This proposal is to add a fifth option to this list, to support serialization > of message payloads based on [ScalaPB](|[https://scalapb.github.io/docs/]). > This would allow Scala developers to use ScalaPB-generated
[jira] [Created] (FLINK-19176) Support ScalaPB as a message payload serializer in Stateful Functions
Galen Warren created FLINK-19176: Summary: Support ScalaPB as a message payload serializer in Stateful Functions Key: FLINK-19176 URL: https://issues.apache.org/jira/browse/FLINK-19176 Project: Flink Issue Type: Improvement Components: Stateful Functions Affects Versions: 2.0.0 Reporter: Galen Warren Fix For: statefun-2.1.0 Currently, Stateful Functions supports four options for serialization of message payloads: * Protobuf (based on code generated for Java) * Kryo * Multilanguage * Raw This proposal is to add a fifth option to this list, to support serialization of message payloads based on [ScalaPB|[https://scalapb.github.io/docs/]]. This would allow Scala developers to use ScalaPB-generated classes as message types in Stateful Functions directly as message types, without having to convert to/from code generated for Java and/or raw byte[] messages. This would be a simple implementation, as there is already a [MessagePayloadSerializer |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]] interface that is implemented for each of the existing serialization methods; supporting ScalaPB would require a new class implementing MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to select it, by adding a new value to the [MessageFactoryType |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration. Plus testing :) I've done this already locally, the implementation is very similar to the existing [MessagePayloadSerializerPb |[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation. It uses a reference to ScalaPB in "provided" scope. Would you be interested in a pull request to add this? Primary benefit is to make it easy to use Stateful Functions in a Scala environment. Thanks. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-11055) Allow Queryable State to be transformed on the TaskManager before being returned to the client
Galen Warren created FLINK-11055: Summary: Allow Queryable State to be transformed on the TaskManager before being returned to the client Key: FLINK-11055 URL: https://issues.apache.org/jira/browse/FLINK-11055 Project: Flink Issue Type: Improvement Components: Queryable State Reporter: Galen Warren Fix For: 1.7.0 The proposal here is to enhance the way Queryable State works to allow for the state object to be transformed on the TaskManager before being returned to the client. As an example, if some MapState were made queryable, such a transform might look up a specific key in the map and return its corresponding value, resulting in only that value being returned to the client instead of the entire map. This could be useful in cases where the client only wants a portion of the state and the state is large (this is my use case). At a high level, I think this could be accomplished by adding an (optional) serializable Function into KvStateRequest (and related classes?) and having that transform be applied in the QueryableStateServer (or QueryableStateClientProxy?). I expect some additional TypeInformation would also have to be supplied/used in places. It should be doable in a backwards compatible way such that if the client does not specify a transform it works exactly as it does now. Would there be any interested in a PR for this? This would help me for something I'm currently working on and I'd be willing to take a crack at it. If there is interest, I'll be happy to do some more research to come up with a more concrete proposal. Thanks for Flink - it's great! -- This message was sent by Atlassian JIRA (v7.6.3#76005)