[jira] [Comment Edited] (FLINK-25672) FileSource enumerator remembers paths of all already processed files which can result in large state
[ https://issues.apache.org/jira/browse/FLINK-25672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17679874#comment-17679874 ] Cliff Resnick edited comment on FLINK-25672 at 1/23/23 4:13 PM: I imagine it may require a breaking change from what I can tell by the design, with the stateless factory for the Enumerator – or maybe there can simply be a configured TTL? Anyway, I will be looking at it. was (Author: cre...@gmail.com): I imagine it may require a breaking change from what I can tell by the design, with the stateless factory fro the Enumerator. But I will be looking at it. > FileSource enumerator remembers paths of all already processed files which > can result in large state > > > Key: FLINK-25672 > URL: https://issues.apache.org/jira/browse/FLINK-25672 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Martijn Visser >Priority: Major > > As mentioned in the Filesystem documentation, for Unbounded File Sources, the > {{FileEnumerator}} currently remembers paths of all already processed files, > which is a state that can in come cases grow rather large. > We should look into possibilities to reduce this. We could look into adding a > compressed form of tracking already processed files (for example by keeping > modification timestamps lower boundaries). > When fixed, this should also be reflected in the documentation, as mentioned > in https://github.com/apache/flink/pull/18288#discussion_r785707311 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25672) FileSource enumerator remembers paths of all already processed files which can result in large state
[ https://issues.apache.org/jira/browse/FLINK-25672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17679874#comment-17679874 ] Cliff Resnick commented on FLINK-25672: --- I imagine it may require a breaking change from what I can tell by the design, with the stateless factory fro the Enumerator. But I will be looking at it. > FileSource enumerator remembers paths of all already processed files which > can result in large state > > > Key: FLINK-25672 > URL: https://issues.apache.org/jira/browse/FLINK-25672 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Martijn Visser >Priority: Major > > As mentioned in the Filesystem documentation, for Unbounded File Sources, the > {{FileEnumerator}} currently remembers paths of all already processed files, > which is a state that can in come cases grow rather large. > We should look into possibilities to reduce this. We could look into adding a > compressed form of tracking already processed files (for example by keeping > modification timestamps lower boundaries). > When fixed, this should also be reflected in the documentation, as mentioned > in https://github.com/apache/flink/pull/18288#discussion_r785707311 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25672) FileSource enumerator remembers paths of all already processed files which can result in large state
[ https://issues.apache.org/jira/browse/FLINK-25672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17679868#comment-17679868 ] Cliff Resnick commented on FLINK-25672: --- This issue has turned into a real problem for us with our transactional datastream jobs. The problem is exacerbated by the the fact the the state is not distributed, and instead localized to the job manager, which is rather ugly in our HA K8s setup where we have a 16Gb limit in the common pool that our JMs run in, and we are blowing past that simply with the un-evictable file path history. Is anyone looking into this? > FileSource enumerator remembers paths of all already processed files which > can result in large state > > > Key: FLINK-25672 > URL: https://issues.apache.org/jira/browse/FLINK-25672 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Martijn Visser >Priority: Major > > As mentioned in the Filesystem documentation, for Unbounded File Sources, the > {{FileEnumerator}} currently remembers paths of all already processed files, > which is a state that can in come cases grow rather large. > We should look into possibilities to reduce this. We could look into adding a > compressed form of tracking already processed files (for example by keeping > modification timestamps lower boundaries). > When fixed, this should also be reflected in the documentation, as mentioned > in https://github.com/apache/flink/pull/18288#discussion_r785707311 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-11947) Support MapState value schema evolution for RocksDB
[ https://issues.apache.org/jira/browse/FLINK-11947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16856993#comment-16856993 ] Cliff Resnick edited comment on FLINK-11947 at 6/5/19 7:49 PM: --- [~klion26] confirmed working! Sorry it took me so long, but thanks for your work, it's much appreciated. was (Author: cre...@gmail.com): [~klion26] confirmed working! Sorry it took me so lone, but thanks for your work, it's much appreciated. > Support MapState value schema evolution for RocksDB > --- > > Key: FLINK-11947 > URL: https://issues.apache.org/jira/browse/FLINK-11947 > Project: Flink > Issue Type: Improvement > Components: API / Type Serialization System, Runtime / State Backends >Reporter: Tzu-Li (Gordon) Tai >Assignee: Congxian Qiu(klion26) >Priority: Critical > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Currently, we do not attempt to perform state schema evolution if the key or > value's schema of a user {{MapState}} has changed when using {{RocksDB}}: > https://github.com/apache/flink/blob/953a5ffcbdae4115f7d525f310723cf8770779df/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L542 > This was disallowed in the initial support for state schema evolution because > the way we did state evolution in the RocksDB state backend was simply > overwriting values. > For {{MapState}} key evolution, only overwriting RocksDB values does not > work, since RocksDB entries for {{MapState}} uses a composite key containing > the map state key. This means that when evolving {{MapState}} in this case > with an evolved key schema, we will have new entries. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11947) Support MapState value schema evolution for RocksDB
[ https://issues.apache.org/jira/browse/FLINK-11947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16856993#comment-16856993 ] Cliff Resnick commented on FLINK-11947: --- [~klion26] confirmed working! Sorry it took me so lone, but thanks for your work, it's much appreciated. > Support MapState value schema evolution for RocksDB > --- > > Key: FLINK-11947 > URL: https://issues.apache.org/jira/browse/FLINK-11947 > Project: Flink > Issue Type: Improvement > Components: API / Type Serialization System, Runtime / State Backends >Reporter: Tzu-Li (Gordon) Tai >Assignee: Congxian Qiu(klion26) >Priority: Critical > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Currently, we do not attempt to perform state schema evolution if the key or > value's schema of a user {{MapState}} has changed when using {{RocksDB}}: > https://github.com/apache/flink/blob/953a5ffcbdae4115f7d525f310723cf8770779df/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L542 > This was disallowed in the initial support for state schema evolution because > the way we did state evolution in the RocksDB state backend was simply > overwriting values. > For {{MapState}} key evolution, only overwriting RocksDB values does not > work, since RocksDB entries for {{MapState}} uses a composite key containing > the map state key. This means that when evolving {{MapState}} in this case > with an evolved key schema, we will have new entries. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11947) Support MapState value schema evolution for RocksDB
[ https://issues.apache.org/jira/browse/FLINK-11947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16854459#comment-16854459 ] Cliff Resnick commented on FLINK-11947: --- I was not able to get to it on Friday, but will give it another go today. > Support MapState value schema evolution for RocksDB > --- > > Key: FLINK-11947 > URL: https://issues.apache.org/jira/browse/FLINK-11947 > Project: Flink > Issue Type: Improvement > Components: API / Type Serialization System, Runtime / State Backends >Reporter: Tzu-Li (Gordon) Tai >Assignee: Congxian Qiu(klion26) >Priority: Critical > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Currently, we do not attempt to perform state schema evolution if the key or > value's schema of a user {{MapState}} has changed when using {{RocksDB}}: > https://github.com/apache/flink/blob/953a5ffcbdae4115f7d525f310723cf8770779df/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L542 > This was disallowed in the initial support for state schema evolution because > the way we did state evolution in the RocksDB state backend was simply > overwriting values. > For {{MapState}} key evolution, only overwriting RocksDB values does not > work, since RocksDB entries for {{MapState}} uses a composite key containing > the map state key. This means that when evolving {{MapState}} in this case > with an evolved key schema, we will have new entries. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11947) Support MapState value schema evolution for RocksDB
[ https://issues.apache.org/jira/browse/FLINK-11947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16851810#comment-16851810 ] Cliff Resnick commented on FLINK-11947: --- [~klion26] great! I'll try it today. > Support MapState value schema evolution for RocksDB > --- > > Key: FLINK-11947 > URL: https://issues.apache.org/jira/browse/FLINK-11947 > Project: Flink > Issue Type: Improvement > Components: API / Type Serialization System, Runtime / State Backends >Reporter: Tzu-Li (Gordon) Tai >Assignee: Congxian Qiu(klion26) >Priority: Critical > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Currently, we do not attempt to perform state schema evolution if the key or > value's schema of a user {{MapState}} has changed when using {{RocksDB}}: > https://github.com/apache/flink/blob/953a5ffcbdae4115f7d525f310723cf8770779df/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L542 > This was disallowed in the initial support for state schema evolution because > the way we did state evolution in the RocksDB state backend was simply > overwriting values. > For {{MapState}} key evolution, only overwriting RocksDB values does not > work, since RocksDB entries for {{MapState}} uses a composite key containing > the map state key. This means that when evolving {{MapState}} in this case > with an evolved key schema, we will have new entries. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11947) Support MapState value schema evolution for RocksDB
[ https://issues.apache.org/jira/browse/FLINK-11947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16850743#comment-16850743 ] Cliff Resnick commented on FLINK-11947: --- Thanks [~klion26]. I look forward to testing your fix! > Support MapState value schema evolution for RocksDB > --- > > Key: FLINK-11947 > URL: https://issues.apache.org/jira/browse/FLINK-11947 > Project: Flink > Issue Type: Improvement > Components: API / Type Serialization System, Runtime / State Backends >Reporter: Tzu-Li (Gordon) Tai >Assignee: Congxian Qiu(klion26) >Priority: Critical > > Currently, we do not attempt to perform state schema evolution if the key or > value's schema of a user {{MapState}} has changed when using {{RocksDB}}: > https://github.com/apache/flink/blob/953a5ffcbdae4115f7d525f310723cf8770779df/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L542 > This was disallowed in the initial support for state schema evolution because > the way we did state evolution in the RocksDB state backend was simply > overwriting values. > For {{MapState}} key evolution, only overwriting RocksDB values does not > work, since RocksDB entries for {{MapState}} uses a composite key containing > the map state key. This means that when evolving {{MapState}} in this case > with an evolved key schema, we will have new entries. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11947) Support MapState key / value schema evolution for RocksDB
[ https://issues.apache.org/jira/browse/FLINK-11947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16847669#comment-16847669 ] Cliff Resnick commented on FLINK-11947: --- Does the deescalation from Blocker mean that this will not likely be fixed by 1.9? On Fri, May 24, 2019, 10:15 AM Aljoscha Krettek (JIRA) > Support MapState key / value schema evolution for RocksDB > - > > Key: FLINK-11947 > URL: https://issues.apache.org/jira/browse/FLINK-11947 > Project: Flink > Issue Type: Improvement > Components: API / Type Serialization System, Runtime / State Backends >Reporter: Tzu-Li (Gordon) Tai >Priority: Critical > > Currently, we do not attempt to perform state schema evolution if the key or > value's schema of a user {{MapState}} has changed when using {{RocksDB}}: > https://github.com/apache/flink/blob/953a5ffcbdae4115f7d525f310723cf8770779df/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L542 > This was disallowed in the initial support for state schema evolution because > the way we did state evolution in the RocksDB state backend was simply > overwriting values. > For {{MapState}} key evolution, only overwriting RocksDB values does not > work, since RocksDB entries for {{MapState}} uses a composite key containing > the map state key. This means that when evolving {{MapState}} in this case > with an evolved key schema, we will have new entries. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12334) change to MockStreamTask breaks OneInputStreamOperatorTestHarness
[ https://issues.apache.org/jira/browse/FLINK-12334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16831932#comment-16831932 ] Cliff Resnick edited comment on FLINK-12334 at 5/2/19 8:28 PM: --- Hi [~fan_li_ya] We have a custom StreamOperator with an open() method like: {{override def open(): Unit = {}} {{ getRuntimeContext.addAccumulator("combiner-in", recordsIn)}} {{ getRuntimeContext.addAccumulator("combiner-out", recordsOut)}} {{}}} The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the MockStreamTask returned by getRuntimeContext is trying to add the accumulator to an immutable empty map. was (Author: cre...@gmail.com): Hi [~fan_li_ya] We have a custom StreamOperator with an open() method like: {{override def open(): Unit = {}} {{ getRuntimeContext.addAccumulator("combiner-in", recordsIn)}} {{ getRuntimeContext.addAccumulator("combiner-out", recordsOut)}} {{ getProcessingTimeService.scheduleAtFixedRate(this, pause, pause)}} {{}}} The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the MockStreamTask returned by getRuntimeContext is trying to add the accumulator to an immutable empty map. > change to MockStreamTask breaks OneInputStreamOperatorTestHarness > - > > Key: FLINK-12334 > URL: https://issues.apache.org/jira/browse/FLINK-12334 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.8.0 >Reporter: Cliff Resnick >Priority: Major > > The move to the MockStreamTask is created with does not include > initialization of an Accumulator Map when using the builder > [https://github.com/apache/flink/blob/e43d55445e7abcadb92460bb4d61e28540f1189d/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java#L198] > This results in with a TestHarness whose context contains an immutable empty > map and is breaking tests. The fix is simple, please include an actual map in > the builder call. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12334) change to MockStreamTask breaks OneInputStreamOperatorTestHarness
[ https://issues.apache.org/jira/browse/FLINK-12334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16831932#comment-16831932 ] Cliff Resnick edited comment on FLINK-12334 at 5/2/19 8:27 PM: --- Hi [~fan_li_ya] We have a custom StreamOperator with an open() method like: {{override def open(): Unit = {}} {{ getRuntimeContext.addAccumulator("combiner-in", recordsIn)}} {{ getRuntimeContext.addAccumulator("combiner-out", recordsOut)}} {{ getProcessingTimeService.scheduleAtFixedRate(this, pause, pause)}} {{}}} The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the MockStreamTask returned by getRuntimeContext is trying to add the accumulator to an immutable empty map. was (Author: cre...@gmail.com): Hi [~fan_li_ya] We have a custom StreamOperator with an open() method like: {{ override def open(): Unit = { getRuntimeContext.addAccumulator("combiner-in", recordsIn) getRuntimeContext.addAccumulator("combiner-out", recordsOut) getProcessingTimeService.scheduleAtFixedRate(this, pause, pause) } }} The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the MockStreamTask returned by getRuntimeContext is trying to add the accumulator to an immutable empty map. > change to MockStreamTask breaks OneInputStreamOperatorTestHarness > - > > Key: FLINK-12334 > URL: https://issues.apache.org/jira/browse/FLINK-12334 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.8.0 >Reporter: Cliff Resnick >Priority: Major > > The move to the MockStreamTask is created with does not include > initialization of an Accumulator Map when using the builder > [https://github.com/apache/flink/blob/e43d55445e7abcadb92460bb4d61e28540f1189d/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java#L198] > This results in with a TestHarness whose context contains an immutable empty > map and is breaking tests. The fix is simple, please include an actual map in > the builder call. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12334) change to MockStreamTask breaks OneInputStreamOperatorTestHarness
[ https://issues.apache.org/jira/browse/FLINK-12334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16831932#comment-16831932 ] Cliff Resnick edited comment on FLINK-12334 at 5/2/19 8:27 PM: --- Hi [~fan_li_ya] We have a custom StreamOperator with an open() method like: {{override def open(): Unit = {}} {{ getRuntimeContext.addAccumulator("combiner-in", recordsIn)}} {{ getRuntimeContext.addAccumulator("combiner-out", recordsOut)}} {{ getProcessingTimeService.scheduleAtFixedRate(this, pause, pause)}} {{}}} The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the MockStreamTask returned by getRuntimeContext is trying to add the accumulator to an immutable empty map. was (Author: cre...@gmail.com): Hi [~fan_li_ya] We have a custom StreamOperator with an open() method like: {{override def open(): Unit = {}} {{ getRuntimeContext.addAccumulator("combiner-in", recordsIn)}} {{ getRuntimeContext.addAccumulator("combiner-out", recordsOut)}} {{ getProcessingTimeService.scheduleAtFixedRate(this, pause, pause)}} {{}}} The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the MockStreamTask returned by getRuntimeContext is trying to add the accumulator to an immutable empty map. > change to MockStreamTask breaks OneInputStreamOperatorTestHarness > - > > Key: FLINK-12334 > URL: https://issues.apache.org/jira/browse/FLINK-12334 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.8.0 >Reporter: Cliff Resnick >Priority: Major > > The move to the MockStreamTask is created with does not include > initialization of an Accumulator Map when using the builder > [https://github.com/apache/flink/blob/e43d55445e7abcadb92460bb4d61e28540f1189d/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java#L198] > This results in with a TestHarness whose context contains an immutable empty > map and is breaking tests. The fix is simple, please include an actual map in > the builder call. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12334) change to MockStreamTask breaks OneInputStreamOperatorTestHarness
[ https://issues.apache.org/jira/browse/FLINK-12334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16831932#comment-16831932 ] Cliff Resnick edited comment on FLINK-12334 at 5/2/19 8:26 PM: --- Hi [~fan_li_ya] We have a custom StreamOperator with an open() method like: {{ override def open(): Unit = { getRuntimeContext.addAccumulator("combiner-in", recordsIn) getRuntimeContext.addAccumulator("combiner-out", recordsOut) getProcessingTimeService.scheduleAtFixedRate(this, pause, pause) } }} The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the MockStreamTask returned by getRuntimeContext is trying to add the accumulator to an immutable empty map. was (Author: cre...@gmail.com): Hi [~fan_li_ya] We have a custom StreamOperator with an open() method like: override def open(): Unit = { getRuntimeContext.addAccumulator("combiner-in", recordsIn) getRuntimeContext.addAccumulator("combiner-out", recordsOut) getProcessingTimeService.scheduleAtFixedRate(this, pause, pause) } The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the MockStreamTask returned by getRuntimeContext is trying to add the accumulator to an immutable empty map. > change to MockStreamTask breaks OneInputStreamOperatorTestHarness > - > > Key: FLINK-12334 > URL: https://issues.apache.org/jira/browse/FLINK-12334 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.8.0 >Reporter: Cliff Resnick >Priority: Major > > The move to the MockStreamTask is created with does not include > initialization of an Accumulator Map when using the builder > [https://github.com/apache/flink/blob/e43d55445e7abcadb92460bb4d61e28540f1189d/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java#L198] > This results in with a TestHarness whose context contains an immutable empty > map and is breaking tests. The fix is simple, please include an actual map in > the builder call. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12334) change to MockStreamTask breaks OneInputStreamOperatorTestHarness
[ https://issues.apache.org/jira/browse/FLINK-12334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16831932#comment-16831932 ] Cliff Resnick commented on FLINK-12334: --- Hi [~fan_li_ya] We have a custom StreamOperator with an open() method like: override def open(): Unit = { getRuntimeContext.addAccumulator("combiner-in", recordsIn) getRuntimeContext.addAccumulator("combiner-out", recordsOut) getProcessingTimeService.scheduleAtFixedRate(this, pause, pause) } The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the MockStreamTask returned by getRuntimeContext is trying to add the accumulator to an immutable empty map. > change to MockStreamTask breaks OneInputStreamOperatorTestHarness > - > > Key: FLINK-12334 > URL: https://issues.apache.org/jira/browse/FLINK-12334 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.8.0 >Reporter: Cliff Resnick >Priority: Major > > The move to the MockStreamTask is created with does not include > initialization of an Accumulator Map when using the builder > [https://github.com/apache/flink/blob/e43d55445e7abcadb92460bb4d61e28540f1189d/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java#L198] > This results in with a TestHarness whose context contains an immutable empty > map and is breaking tests. The fix is simple, please include an actual map in > the builder call. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12334) change to MockStreamTask breaks OneInputStreamOperatorTestHarness
Cliff Resnick created FLINK-12334: - Summary: change to MockStreamTask breaks OneInputStreamOperatorTestHarness Key: FLINK-12334 URL: https://issues.apache.org/jira/browse/FLINK-12334 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.8.0 Reporter: Cliff Resnick The move to the MockStreamTask is created with does not include initialization of an Accumulator Map when using the builder [https://github.com/apache/flink/blob/e43d55445e7abcadb92460bb4d61e28540f1189d/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java#L198] This results in with a TestHarness whose context contains an immutable empty map and is breaking tests. The fix is simple, please include an actual map in the builder call. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11947) Support MapState key / value schema evolution for RocksDB
[ https://issues.apache.org/jira/browse/FLINK-11947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16795072#comment-16795072 ] Cliff Resnick commented on FLINK-11947: --- Thanks for the explanation, that makes sense. However, I'm guessing in practice the vast preponderance of Schema Evolution happens on the Value side. Is the detection specific enough to perhaps make the guard exclusive to the Key side? Because it's clear that's where the technical "dragons" await, but perhaps in reality no one ever goes there. > Support MapState key / value schema evolution for RocksDB > - > > Key: FLINK-11947 > URL: https://issues.apache.org/jira/browse/FLINK-11947 > Project: Flink > Issue Type: Improvement > Components: API / Type Serialization System, Runtime / State Backends >Reporter: Tzu-Li (Gordon) Tai >Priority: Major > > Currently, we do not attempt to perform state schema evolution if the key or > value's schema of a user {{MapState}} has changed when using {{RocksDB}}: > https://github.com/apache/flink/blob/953a5ffcbdae4115f7d525f310723cf8770779df/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L542 > This was disallowed in the initial support for state schema evolution because > the way we did state evolution in the RocksDB state backend was simply > overwriting values. > For {{MapState}} key evolution, only overwriting RocksDB values does not > work, since RocksDB entries for {{MapState}} uses a composite key containing > the map state key. This means that when evolving {{MapState}} in this case > with an evolved key schema, we will have new entries. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10671) rest monitoring api Savepoint status call fails if akka.ask.timeout < checkpoint duration
[ https://issues.apache.org/jira/browse/FLINK-10671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675184#comment-16675184 ] Cliff Resnick commented on FLINK-10671: --- [~gjy] yes, I think when I created the issue I couldn't find a way to select 1.6.0. I should have tried upgrading first! I will do so and reattempt with smaller akka.ask.timeout today. > rest monitoring api Savepoint status call fails if akka.ask.timeout < > checkpoint duration > - > > Key: FLINK-10671 > URL: https://issues.apache.org/jira/browse/FLINK-10671 > Project: Flink > Issue Type: Bug > Components: REST >Affects Versions: 1.6.1 >Reporter: Cliff Resnick >Assignee: Gary Yao >Priority: Minor > > Hi, > > There seems to be a problem with REST monitoring API: > |/jobs/:jobid/savepoints/:triggerid| > > The problem is that when the Savepoint represented by {{:triggerid}} is > called with {{cancel=true}} the above status call seems to fail if the > savepoint duration exceeds {{akka.ask.timeout}} value. > > Below is a log in which I invoke "cancel with savepoint" then poll the above > endpoint for status at 2 second intervals. {{akka.ask.timout}} is set for > twenty seconds. The error is repeatable at various values of > {{akka.ask.timeout}}. > > {noformat} > 2018/10/24 19:42:25 savepoint id 925964b35b2d501f4a45b714eca0a2ca is > IN_PROGRESS > 2018/10/24 19:42:27 savepoint id 925964b35b2d501f4a45b714eca0a2ca is > IN_PROGRESS > 2018/10/24 19:42:29 savepoint id 925964b35b2d501f4a45b714eca0a2ca is > IN_PROGRESS > 2018/10/24 19:42:31 savepoint id 925964b35b2d501f4a45b714eca0a2ca is > IN_PROGRESS > 2018/10/24 19:42:33 savepoint id 925964b35b2d501f4a45b714eca0a2ca is > IN_PROGRESS > 2018/10/24 19:42:35 savepoint id 925964b35b2d501f4a45b714eca0a2ca is > IN_PROGRESS > 2018/10/24 19:42:37 savepoint id 925964b35b2d501f4a45b714eca0a2ca is > IN_PROGRESS > 2018/10/24 19:42:39 savepoint id 925964b35b2d501f4a45b714eca0a2ca is > IN_PROGRESS > 2018/10/24 19:42:41 savepoint id 925964b35b2d501f4a45b714eca0a2ca is > IN_PROGRESS > 2018/10/24 19:42:43 savepoint id 925964b35b2d501f4a45b714eca0a2ca is > IN_PROGRESS > 2018/10/24 19:42:45 Cancel with Savepoint may have failed: > java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: > Ask timed out on [Actor[akka://flink/user/jobmanager_0#-234856817]] after > [2 ms]. Sender[null] sent message of type > "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". > at > java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) > at > java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338) > at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911) > at > java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:770) > at akka.dispatch.OnComplete.internal(Future.scala:258) > at akka.dispatch.OnComplete.internal(Future.scala:256) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) > at > org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83) > at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) > at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) > at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603) > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) > at > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) > at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) > at java.lang.Thread.run(Thread.java:748) > Caused by: akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/jobmanager_0#-234856817]] after [2 ms]. > Sender[null] sent message of type > "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
[jira] [Created] (FLINK-10671) rest monitoring api Savepoint status call fails if akka.ask.timeout < checkpoint duration
Cliff Resnick created FLINK-10671: - Summary: rest monitoring api Savepoint status call fails if akka.ask.timeout < checkpoint duration Key: FLINK-10671 URL: https://issues.apache.org/jira/browse/FLINK-10671 Project: Flink Issue Type: Bug Components: REST Affects Versions: 1.6.1 Reporter: Cliff Resnick Hi, There seems to be a problem with REST monitoring API: |/jobs/:jobid/savepoints/:triggerid| The problem is that when the Savepoint represented by :triggerid is called with `cancel=true` the above status call seems to fail if the savepoint duration exceeds `akka.ask.timeout` value. Below is a log in which I invoke "cancel with savepoint" then poll the above endpoint for status at 2 second intervals. akka.ask.timout is set for twenty seconds. The error is repeatable at various values of akka.ask.timeout. 2018/10/24 19:42:25 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS 2018/10/24 19:42:27 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS 2018/10/24 19:42:29 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS 2018/10/24 19:42:31 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS 2018/10/24 19:42:33 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS 2018/10/24 19:42:35 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS 2018/10/24 19:42:37 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS 2018/10/24 19:42:39 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS 2018/10/24 19:42:41 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS 2018/10/24 19:42:43 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS 2018/10/24 19:42:45 Cancel with Savepoint may have failed: java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/jobmanager_0#-234856817]] after [2 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338) at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911) at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:770) at akka.dispatch.OnComplete.internal(Future.scala:258) at akka.dispatch.OnComplete.internal(Future.scala:256) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603) at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) at java.lang.Thread.run(Thread.java:748) Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/jobmanager_0#-234856817]] after [2 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604) ... 9 more -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9339) Accumulators are not UI accessible running in FLIP-6 mode
Cliff Resnick created FLINK-9339: Summary: Accumulators are not UI accessible running in FLIP-6 mode Key: FLINK-9339 URL: https://issues.apache.org/jira/browse/FLINK-9339 Project: Flink Issue Type: Bug Components: REST Affects Versions: 1.5.0 Reporter: Cliff Resnick Using 1.5-rc2, when I run a job in flip-6 mode and try to access Accumulators in the UI nothing shows. Looking at the Job manager log there is this error: 2018-05-11 17:09:04,707 ERROR org.apache.flink.runtime.rest.handler.job.SubtaskCurrentAttemptDetailsHandler - Could not create the handler request. org.apache.flink.runtime.rest.handler.HandlerRequestException: Cannot resolve path parameter (subtaskindex) from value "accumulators". at org.apache.flink.runtime.rest.handler.HandlerRequest.(HandlerRequest.java:61) at org.apache.flink.runtime.rest.AbstractHandler.respondAsLeader(AbstractHandler.java:155) at org.apache.flink.runtime.rest.handler.RedirectHandler.lambda$null$0(RedirectHandler.java:139) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) at java.lang.Thread.run(Thread.java:748) This error does not occur when running the same job in legacy mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8616) Missing null check in OperatorChain.CopyingChainingOutput#pushToOperator masks ClassCastException
[ https://issues.apache.org/jira/browse/FLINK-8616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cliff Resnick updated FLINK-8616: - Summary: Missing null check in OperatorChain.CopyingChainingOutput#pushToOperator masks ClassCastException (was: Missing null check in OperatorChain.copyingChainOutput#pushToOperator masks ClassCastException) > Missing null check in OperatorChain.CopyingChainingOutput#pushToOperator > masks ClassCastException > - > > Key: FLINK-8616 > URL: https://issues.apache.org/jira/browse/FLINK-8616 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Cliff Resnick >Priority: Major > > There is an attempt to enrich the exception with outputTag#getId, but > outputTag is null, and a NullPointerException is thrown. Looking at the > attempted message enrichment the code seems to assume a ClassCastException > can only stem from SideOutput type mismatches. This may have been the norm > before, but changes to classloader delegation in 1.4 have given rise to > multiple ClassLoader conflicts (at least for us), and they all seem to end up > here. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8616) Missing null check in OperatorChain.copyingChainOutput#pushToOperator masks ClassCastException
[ https://issues.apache.org/jira/browse/FLINK-8616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cliff Resnick updated FLINK-8616: - Summary: Missing null check in OperatorChain.copyingChainOutput#pushToOperator masks ClassCastException (was: Missing null check in OperatorChain#pushToOperator masks ClassCastException) > Missing null check in OperatorChain.copyingChainOutput#pushToOperator masks > ClassCastException > -- > > Key: FLINK-8616 > URL: https://issues.apache.org/jira/browse/FLINK-8616 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Cliff Resnick >Priority: Major > > There is an attempt to enrich the exception with outputTag#getId, but > outputTag is null, and a NullPointerException is thrown. Looking at the > attempted message enrichment the code seems to assume a ClassCastException > can only stem from SideOutput type mismatches. This may have been the norm > before, but changes to classloader delegation in 1.4 have given rise to > multiple ClassLoader conflicts (at least for us), and they all seem to end up > here. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8616) Missing null check in OperatorChain#pushToOperator masks ClassCastException
Cliff Resnick created FLINK-8616: Summary: Missing null check in OperatorChain#pushToOperator masks ClassCastException Key: FLINK-8616 URL: https://issues.apache.org/jira/browse/FLINK-8616 Project: Flink Issue Type: Bug Components: DataStream API Affects Versions: 1.4.0 Reporter: Cliff Resnick There is an attempt to enrich the exception with outputTag#getId, but outputTag is null, and a NullPointerException is thrown. Looking at the attempted message enrichment the code seems to assume a ClassCastException can only stem from SideOutput type mismatches. This may have been the norm before, but changes to classloader delegation in 1.4 have given rise to multiple ClassLoader conflicts (at least for us), and they all seem to end up here. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16064079#comment-16064079 ] Cliff Resnick commented on FLINK-6964: -- [~srichter] Looks good from this end, all tests passed. Thanks! > Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore > -- > > Key: FLINK-6964 > URL: https://issues.apache.org/jira/browse/FLINK-6964 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter > > {{StandaloneCompletedCheckpointStore}} does not register shared states ion > resume. However, for externalized checkpoints, it register the checkpoint > from which it resumed. This checkpoint gets added to the completed checkpoint > store as part of resume. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16063768#comment-16063768 ] Cliff Resnick commented on FLINK-6964: -- [~srichter] So far, looks good! I need to leave early today but I'll hit it a few more times this evening just to be sure. > Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore > -- > > Key: FLINK-6964 > URL: https://issues.apache.org/jira/browse/FLINK-6964 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter > > {{StandaloneCompletedCheckpointStore}} does not register shared states ion > resume. However, for externalized checkpoints, it register the checkpoint > from which it resumed. This checkpoint gets added to the completed checkpoint > store as part of resume. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061270#comment-16061270 ] Cliff Resnick commented on FLINK-6964: -- looks likes it's still trying to register a Placeholder? > Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore > -- > > Key: FLINK-6964 > URL: https://issues.apache.org/jira/browse/FLINK-6964 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter > > {{StandaloneCompletedCheckpointStore}} does not register shared states ion > resume. However, for externalized checkpoints, it register the checkpoint > from which it resumed. This checkpoint gets added to the completed checkpoint > store as part of resume. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061257#comment-16061257 ] Cliff Resnick commented on FLINK-6964: -- I ran with your newer precondition. It actually succeeded once, but failed the next two runs, hung on org.apache.flink.runtime.state.SharedStateRegistry - Attempt to register for key WindowOperator... I tried with just a a single slot, but that also hung. The log above represents the hang condition. All the above logged here https://gist.github.com/cresny/0e109f843730b64d3a330f8fb06bb8a6 The good news is there was an exception around state registry > Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore > -- > > Key: FLINK-6964 > URL: https://issues.apache.org/jira/browse/FLINK-6964 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter > > {{StandaloneCompletedCheckpointStore}} does not register shared states ion > resume. However, for externalized checkpoints, it register the checkpoint > from which it resumed. This checkpoint gets added to the completed checkpoint > store as part of resume. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061216#comment-16061216 ] Cliff Resnick commented on FLINK-6964: -- ok, will try that. meanwhile here is a run (and hang) from last push. https://gist.github.com/cresny/8d0d24b1bd72031a515bd9a3822da189 > Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore > -- > > Key: FLINK-6964 > URL: https://issues.apache.org/jira/browse/FLINK-6964 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter > > {{StandaloneCompletedCheckpointStore}} does not register shared states ion > resume. However, for externalized checkpoints, it register the checkpoint > from which it resumed. This checkpoint gets added to the completed checkpoint > store as part of resume. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061178#comment-16061178 ] Cliff Resnick commented on FLINK-6964: -- ok I'll wait on your push > Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore > -- > > Key: FLINK-6964 > URL: https://issues.apache.org/jira/browse/FLINK-6964 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter > > {{StandaloneCompletedCheckpointStore}} does not register shared states ion > resume. However, for externalized checkpoints, it register the checkpoint > from which it resumed. This checkpoint gets added to the completed checkpoint > store as part of resume. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061163#comment-16061163 ] Cliff Resnick commented on FLINK-6964: -- Ha! I just started running. ok, will merge and rebuild The logging scopes above are for a separate network appender, so I tend to keep it narrow. Should I broaden it to all of flink.runtime? > Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore > -- > > Key: FLINK-6964 > URL: https://issues.apache.org/jira/browse/FLINK-6964 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter > > {{StandaloneCompletedCheckpointStore}} does not register shared states ion > resume. However, for externalized checkpoints, it register the checkpoint > from which it resumed. This checkpoint gets added to the completed checkpoint > store as part of resume. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061092#comment-16061092 ] Cliff Resnick commented on FLINK-6964: -- I'll merge and rerun. This is what I have for log scope. Should I add anything? {noformat} org.apache.flink.contrib.streaming.state=TRACE org.apache.flink.runtime.checkpoint=TRACE org.apache.flink.runtime.state=TRACE {noformat} > Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore > -- > > Key: FLINK-6964 > URL: https://issues.apache.org/jira/browse/FLINK-6964 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter > > {{StandaloneCompletedCheckpointStore}} does not register shared states ion > resume. However, for externalized checkpoints, it register the checkpoint > from which it resumed. This checkpoint gets added to the completed checkpoint > store as part of resume. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061088#comment-16061088 ] Cliff Resnick commented on FLINK-6964: -- [~srichter] By hanging I mean that the checkpoint, though fully acknowledged, never completes. Looking at the UI I see 100% and a spinning arrow until the checkpoint time expires, apparently without an exception being thrown. I did not merge my added logs into your branch because, from what you described, the issue was with the {code:java}StandaloneCompletedCheckpointStore{code}, which I never added logging to anyway. However if the code as-is in your branch has sufficient logging I can reproduce the issue and create a gist. > Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore > -- > > Key: FLINK-6964 > URL: https://issues.apache.org/jira/browse/FLINK-6964 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter > > {{StandaloneCompletedCheckpointStore}} does not register shared states ion > resume. However, for externalized checkpoints, it register the checkpoint > from which it resumed. This checkpoint gets added to the completed checkpoint > store as part of resume. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060027#comment-16060027 ] Cliff Resnick commented on FLINK-6964: -- [~srichter] I tried your fix. After resuming from a checkpoint, the first subsequent checkpoint got to 100% and hung, then expired several minutes later. The second one repeated this. If logging will help you perhaps if you can add some TRACE level logs and let me know the scopes, and I'll create a gist. > Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore > -- > > Key: FLINK-6964 > URL: https://issues.apache.org/jira/browse/FLINK-6964 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter > > {{StandaloneCompletedCheckpointStore}} does not register shared states ion > resume. However, for externalized checkpoints, it register the checkpoint > from which it resumed. This checkpoint gets added to the completed checkpoint > store as part of resume. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6633) Register with shared state registry before adding to CompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-6633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16058158#comment-16058158 ] Cliff Resnick commented on FLINK-6633: -- Thanks [~srichter], I'll give this a try tomorrow morning EST. > Register with shared state registry before adding to CompletedCheckpointStore > - > > Key: FLINK-6633 > URL: https://issues.apache.org/jira/browse/FLINK-6633 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Blocker > Fix For: 1.3.0 > > > Introducing placeholders for previously existing shared state requires a > change that shared state is first registering with {{SharedStateregistry}} > (thereby being consolidated) and only after that added to a > {{CompletedCheckpointStore}}, so that the consolidated checkpoint is written > to stable storage. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6633) Register with shared state registry before adding to CompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-6633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16056537#comment-16056537 ] Cliff Resnick commented on FLINK-6633: -- Stefan, if you need me to unpack things further please feel free to add snippets here and I'll integrate them. > Register with shared state registry before adding to CompletedCheckpointStore > - > > Key: FLINK-6633 > URL: https://issues.apache.org/jira/browse/FLINK-6633 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Blocker > Fix For: 1.3.0 > > > Introducing placeholders for previously existing shared state requires a > change that shared state is first registering with {{SharedStateregistry}} > (thereby being consolidated) and only after that added to a > {{CompletedCheckpointStore}}, so that the consolidated checkpoint is written > to stable storage. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6633) Register with shared state registry before adding to CompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-6633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16056527#comment-16056527 ] Cliff Resnick commented on FLINK-6633: -- Ok, new gist is here: https://gist.github.com/cresny/87c997b558064a7ec5e8021ae7456653 This one cancels job after two checkpoints, first checkpoint after recovery fails with Placeholder serialization attempt. > Register with shared state registry before adding to CompletedCheckpointStore > - > > Key: FLINK-6633 > URL: https://issues.apache.org/jira/browse/FLINK-6633 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Blocker > Fix For: 1.3.0 > > > Introducing placeholders for previously existing shared state requires a > change that shared state is first registering with {{SharedStateregistry}} > (thereby being consolidated) and only after that added to a > {{CompletedCheckpointStore}}, so that the consolidated checkpoint is written > to stable storage. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6633) Register with shared state registry before adding to CompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-6633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16056342#comment-16056342 ] Cliff Resnick commented on FLINK-6633: -- full log here: https://gist.github.com/cresny/d3c36896ce5692d7772979438c80944e > Register with shared state registry before adding to CompletedCheckpointStore > - > > Key: FLINK-6633 > URL: https://issues.apache.org/jira/browse/FLINK-6633 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Blocker > Fix For: 1.3.0 > > > Introducing placeholders for previously existing shared state requires a > change that shared state is first registering with {{SharedStateregistry}} > (thereby being consolidated) and only after that added to a > {{CompletedCheckpointStore}}, so that the consolidated checkpoint is written > to stable storage. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6633) Register with shared state registry before adding to CompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-6633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16056334#comment-16056334 ] Cliff Resnick commented on FLINK-6633: -- {noformat} 2017-06-20 18:44:39.376 [ip-10-150-96-228] INFO org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Attempting to load RocksDB native library and store it under '/media/flink/tmp0' 2017-06-20 18:44:39.378 [ip-10-150-96-228] DEBUG org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Attempting to create RocksDB native library folder /media/flink/tmp0/rocksdb-lib-18151a61e3774f0bcd2b1adeed79010e 2017-06-20 18:44:39.476 [ip-10-150-96-228] INFO org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Successfully loaded RocksDB native library 2017-06-20 18:44:39.521 [ip-10-150-96-228] INFO o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Initializing RocksDB keyed state backend from snapshot. 2017-06-20 18:44:39.522 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Restoring snapshot from state handles: null. 2017-06-20 18:45:45.067 [ip-10-150-96-228] INFO com.mediamath.reporting.combiner.Combiner - start combiner snapshot for id 0 2017-06-20 18:45:47.241 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/00cae703-95e3-44a9-8914-5bc4f3eb7814 [5788104 bytes] 2017-06-20 18:45:47.241 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - mapping 11.sst -> File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/00cae703-95e3-44a9-8914-5bc4f3eb7814 [5788104 bytes] 2017-06-20 18:45:47.243 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Got StreamStateHandle from stream.closeAndGetHandle ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/050ebbaf-d0e6-4a54-90f0-aee0e8eaca21'} 2017-06-20 18:45:47.451 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Got StreamStateHandle from stream.closeAndGetHandle File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/09c9f86b-8742-4871-8827-8f65f1484a8e [8289 bytes] 2017-06-20 18:45:47.451 [ip-10-150-96-228] DEBUG o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Got StreamStateHandle from stream.closeAndGetHandle ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/3db783b5-bec0-4f2a-8f9f-43b80da7ab41'} 2017-06-20 18:45:51.609 [ip-10-150-96-53] DEBUG o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serialize 1 2017-06-20 18:45:51.620 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/a5431b25-aff1-4a23-ae29-a748deba6dea [30679044 bytes] 2017-06-20 18:45:51.620 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/d00732ce-7226-4247-bdac-7f03d259e575 [23438 bytes] 2017-06-20 18:45:51.620 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/00cae703-95e3-44a9-8914-5bc4f3eb7814 [5788104 bytes] 2017-06-20 18:45:51.621 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/050ebbaf-d0e6-4a54-90f0-aee0e8eaca21'} 2017-06-20 18:45:51.621 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/09c9f86b-8742-4871-8827-8f65f1484a8e [8289 bytes] 2017-06-20 18:45:51.621 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/3db783b5-bec0-4f2a-8f9f-43b80da7ab41'} 2017-06-20 18:45:51.622 [ip-10-150-96-53] TRACE o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer - serializing File State: s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/6f6bdb7c-9bd7-48d2-8705-0f15f34ac8f8 [7209046 bytes] 2017-06-20 18:45:51.625 [ip-10-150-96-53] TRACE
[jira] [Commented] (FLINK-6633) Register with shared state registry before adding to CompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-6633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16056332#comment-16056332 ] Cliff Resnick commented on FLINK-6633: -- The issue that [~gyfora] mentioned still exists in current 1.4-SNAPSHOT, at least when using externalized checkpoints. It does not necessarily happen on first checkpoint after restore, but it does seem to stem from a job restart from externalized checkpoint. To help identify the cause I added a bit of logging to both RocksDBKeyedStateBackend and SavepointV2Serializer, the results of which I'm attaching to the issue. The log spans several checkpoints. You can see where sst files are mapped, then serialized. The last checkpoint (7) fails when it seems to try to serialize a Placeholder instead of 27.sst. I hope this helps. If I can add logging to capture more relevant state please let me know (the test is reproducible). By the way, I also noticed that some sst files are re-serialized in subsequent checkpoints though their byte size does not change. Is that because they are still "hot" in RocksDB? I'm a bit sketchy on the concept so please forgive me! > Register with shared state registry before adding to CompletedCheckpointStore > - > > Key: FLINK-6633 > URL: https://issues.apache.org/jira/browse/FLINK-6633 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Blocker > Fix For: 1.3.0 > > > Introducing placeholders for previously existing shared state requires a > change that shared state is first registering with {{SharedStateregistry}} > (thereby being consolidated) and only after that added to a > {{CompletedCheckpointStore}}, so that the consolidated checkpoint is written > to stable storage. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-5646) REST api documentation missing details on jar upload
Cliff Resnick created FLINK-5646: Summary: REST api documentation missing details on jar upload Key: FLINK-5646 URL: https://issues.apache.org/jira/browse/FLINK-5646 Project: Flink Issue Type: Bug Components: Documentation Reporter: Cliff Resnick Priority: Minor The 1.2 release documentation (https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/rest_api.html) states "It is possible to upload, run, and list Flink programs via the REST APIs and web frontend". However there is no documentation about uploading a jar via REST api. There should be something to the effect of: "You can upload a jar file using http post with the file data sent under a form field 'jarfile'." -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4228) RocksDB semi-async snapshot to S3AFileSystem fails
[ https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15773084#comment-15773084 ] Cliff Resnick commented on FLINK-4228: -- My last pull request is good to go so I guess it's up to you guys. > RocksDB semi-async snapshot to S3AFileSystem fails > -- > > Key: FLINK-4228 > URL: https://issues.apache.org/jira/browse/FLINK-4228 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi > > Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) > leads to an Exception when uploading the snapshot to S3 when using the > {{S3AFileSystem}}. > {code} > AsynchronousException{com.amazonaws.AmazonClientException: Unable to > calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory)} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870) > Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory) > at > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.FileNotFoundException: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory) > at java.io.FileInputStream.open0(Native Method) > at java.io.FileInputStream.open(FileInputStream.java:195) > at java.io.FileInputStream.(FileInputStream.java:138) > at > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294) > ... 9 more > {code} > Running with S3NFileSystem, the error does not occur. The problem might be > due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created > automatically. We might need to manually create folders and copy only actual > files for {{S3AFileSystem}}. More investigation is required. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4228) RocksDB semi-async snapshot to S3AFileSystem fails
[ https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772906#comment-15772906 ] Cliff Resnick commented on FLINK-4228: -- The issue now is exclusive to running on YARN with s3a:// as your configured FileSystem. If so, the Flink session will fail on staging itself because it tries to copy the flink/lib directory to S3 and the S3aFileSystem does not support recursive copy. > RocksDB semi-async snapshot to S3AFileSystem fails > -- > > Key: FLINK-4228 > URL: https://issues.apache.org/jira/browse/FLINK-4228 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi > > Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) > leads to an Exception when uploading the snapshot to S3 when using the > {{S3AFileSystem}}. > {code} > AsynchronousException{com.amazonaws.AmazonClientException: Unable to > calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory)} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870) > Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory) > at > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.FileNotFoundException: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory) > at java.io.FileInputStream.open0(Native Method) > at java.io.FileInputStream.open(FileInputStream.java:195) > at java.io.FileInputStream.(FileInputStream.java:138) > at > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294) > ... 9 more > {code} > Running with S3NFileSystem, the error does not occur. The problem might be > due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created > automatically. We might need to manually create folders and copy only actual > files for {{S3AFileSystem}}. More investigation is required. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4228) RocksDB semi-async snapshot to S3AFileSystem fails
[ https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15390493#comment-15390493 ] Cliff Resnick commented on FLINK-4228: -- I added a pull request for this. I included the fink-yarn recursive staging upload. https://github.com/apache/flink/pull/2288 > RocksDB semi-async snapshot to S3AFileSystem fails > -- > > Key: FLINK-4228 > URL: https://issues.apache.org/jira/browse/FLINK-4228 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi > > Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) > leads to an Exception when uploading the snapshot to S3 when using the > {{S3AFileSystem}}. > {code} > AsynchronousException{com.amazonaws.AmazonClientException: Unable to > calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory)} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870) > Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory) > at > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.FileNotFoundException: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory) > at java.io.FileInputStream.open0(Native Method) > at java.io.FileInputStream.open(FileInputStream.java:195) > at java.io.FileInputStream.(FileInputStream.java:138) > at > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294) > ... 9 more > {code} > Running with S3NFileSystem, the error does not occur. The problem might be > due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created > automatically. We might need to manually create folders and copy only actual > files for {{S3AFileSystem}}. More investigation is required. -- This message was sent by Atlassian JIRA (v6.3.4#6332)