[jira] [Comment Edited] (FLINK-25672) FileSource enumerator remembers paths of all already processed files which can result in large state

2023-01-23 Thread Cliff Resnick (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17679874#comment-17679874
 ] 

Cliff Resnick edited comment on FLINK-25672 at 1/23/23 4:13 PM:


I imagine it may require a breaking change from what I can tell by the design, 
with the stateless factory for the Enumerator – or maybe there can simply be a 
configured TTL?  Anyway, I will be looking at it. 


was (Author: cre...@gmail.com):
I imagine it may require a breaking change from what I can tell by the design, 
with the stateless factory fro the Enumerator. But I will be looking at it.

> FileSource enumerator remembers paths of all already processed files which 
> can result in large state
> 
>
> Key: FLINK-25672
> URL: https://issues.apache.org/jira/browse/FLINK-25672
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Martijn Visser
>Priority: Major
>
> As mentioned in the Filesystem documentation, for Unbounded File Sources, the 
> {{FileEnumerator}} currently remembers paths of all already processed files, 
> which is a state that can in come cases grow rather large. 
> We should look into possibilities to reduce this. We could look into adding a 
> compressed form of tracking already processed files (for example by keeping 
> modification timestamps lower boundaries).
> When fixed, this should also be reflected in the documentation, as mentioned 
> in https://github.com/apache/flink/pull/18288#discussion_r785707311



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25672) FileSource enumerator remembers paths of all already processed files which can result in large state

2023-01-23 Thread Cliff Resnick (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17679874#comment-17679874
 ] 

Cliff Resnick commented on FLINK-25672:
---

I imagine it may require a breaking change from what I can tell by the design, 
with the stateless factory fro the Enumerator. But I will be looking at it.

> FileSource enumerator remembers paths of all already processed files which 
> can result in large state
> 
>
> Key: FLINK-25672
> URL: https://issues.apache.org/jira/browse/FLINK-25672
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Martijn Visser
>Priority: Major
>
> As mentioned in the Filesystem documentation, for Unbounded File Sources, the 
> {{FileEnumerator}} currently remembers paths of all already processed files, 
> which is a state that can in come cases grow rather large. 
> We should look into possibilities to reduce this. We could look into adding a 
> compressed form of tracking already processed files (for example by keeping 
> modification timestamps lower boundaries).
> When fixed, this should also be reflected in the documentation, as mentioned 
> in https://github.com/apache/flink/pull/18288#discussion_r785707311



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25672) FileSource enumerator remembers paths of all already processed files which can result in large state

2023-01-23 Thread Cliff Resnick (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17679868#comment-17679868
 ] 

Cliff Resnick commented on FLINK-25672:
---

This issue has turned into a real problem for us with our transactional 
datastream jobs. The problem is exacerbated by the the fact the the state is 
not distributed, and instead localized to the job manager, which is rather ugly 
in our HA K8s setup where we have a 16Gb limit in the common pool that our JMs 
run in, and we are blowing past that simply with the un-evictable file path 
history.

Is anyone looking into this?

 

 

 

> FileSource enumerator remembers paths of all already processed files which 
> can result in large state
> 
>
> Key: FLINK-25672
> URL: https://issues.apache.org/jira/browse/FLINK-25672
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Martijn Visser
>Priority: Major
>
> As mentioned in the Filesystem documentation, for Unbounded File Sources, the 
> {{FileEnumerator}} currently remembers paths of all already processed files, 
> which is a state that can in come cases grow rather large. 
> We should look into possibilities to reduce this. We could look into adding a 
> compressed form of tracking already processed files (for example by keeping 
> modification timestamps lower boundaries).
> When fixed, this should also be reflected in the documentation, as mentioned 
> in https://github.com/apache/flink/pull/18288#discussion_r785707311



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-11947) Support MapState value schema evolution for RocksDB

2019-06-05 Thread Cliff Resnick (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16856993#comment-16856993
 ] 

Cliff Resnick edited comment on FLINK-11947 at 6/5/19 7:49 PM:
---

[~klion26] confirmed working! Sorry it took me so long, but thanks for your 
work, it's much appreciated.


was (Author: cre...@gmail.com):
[~klion26] confirmed working! Sorry it took me so lone, but thanks for your 
work, it's much appreciated.

> Support MapState value schema evolution for RocksDB
> ---
>
> Key: FLINK-11947
> URL: https://issues.apache.org/jira/browse/FLINK-11947
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System, Runtime / State Backends
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Congxian Qiu(klion26)
>Priority: Critical
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, we do not attempt to perform state schema evolution if the key or 
> value's schema of a user {{MapState}} has changed when using {{RocksDB}}:
> https://github.com/apache/flink/blob/953a5ffcbdae4115f7d525f310723cf8770779df/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L542
> This was disallowed in the initial support for state schema evolution because 
> the way we did state evolution in the RocksDB state backend was simply 
> overwriting values.
> For {{MapState}} key evolution, only overwriting RocksDB values does not 
> work, since RocksDB entries for {{MapState}} uses a composite key containing 
> the map state key. This means that when evolving {{MapState}} in this case 
> with an evolved key schema, we will have new entries.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11947) Support MapState value schema evolution for RocksDB

2019-06-05 Thread Cliff Resnick (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16856993#comment-16856993
 ] 

Cliff Resnick commented on FLINK-11947:
---

[~klion26] confirmed working! Sorry it took me so lone, but thanks for your 
work, it's much appreciated.

> Support MapState value schema evolution for RocksDB
> ---
>
> Key: FLINK-11947
> URL: https://issues.apache.org/jira/browse/FLINK-11947
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System, Runtime / State Backends
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Congxian Qiu(klion26)
>Priority: Critical
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, we do not attempt to perform state schema evolution if the key or 
> value's schema of a user {{MapState}} has changed when using {{RocksDB}}:
> https://github.com/apache/flink/blob/953a5ffcbdae4115f7d525f310723cf8770779df/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L542
> This was disallowed in the initial support for state schema evolution because 
> the way we did state evolution in the RocksDB state backend was simply 
> overwriting values.
> For {{MapState}} key evolution, only overwriting RocksDB values does not 
> work, since RocksDB entries for {{MapState}} uses a composite key containing 
> the map state key. This means that when evolving {{MapState}} in this case 
> with an evolved key schema, we will have new entries.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11947) Support MapState value schema evolution for RocksDB

2019-06-03 Thread Cliff Resnick (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16854459#comment-16854459
 ] 

Cliff Resnick commented on FLINK-11947:
---

I was not able to get to it on Friday, but will give it another go today. 

> Support MapState value schema evolution for RocksDB
> ---
>
> Key: FLINK-11947
> URL: https://issues.apache.org/jira/browse/FLINK-11947
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System, Runtime / State Backends
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Congxian Qiu(klion26)
>Priority: Critical
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, we do not attempt to perform state schema evolution if the key or 
> value's schema of a user {{MapState}} has changed when using {{RocksDB}}:
> https://github.com/apache/flink/blob/953a5ffcbdae4115f7d525f310723cf8770779df/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L542
> This was disallowed in the initial support for state schema evolution because 
> the way we did state evolution in the RocksDB state backend was simply 
> overwriting values.
> For {{MapState}} key evolution, only overwriting RocksDB values does not 
> work, since RocksDB entries for {{MapState}} uses a composite key containing 
> the map state key. This means that when evolving {{MapState}} in this case 
> with an evolved key schema, we will have new entries.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11947) Support MapState value schema evolution for RocksDB

2019-05-30 Thread Cliff Resnick (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16851810#comment-16851810
 ] 

Cliff Resnick commented on FLINK-11947:
---

[~klion26] great! I'll try it today.

> Support MapState value schema evolution for RocksDB
> ---
>
> Key: FLINK-11947
> URL: https://issues.apache.org/jira/browse/FLINK-11947
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System, Runtime / State Backends
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Congxian Qiu(klion26)
>Priority: Critical
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, we do not attempt to perform state schema evolution if the key or 
> value's schema of a user {{MapState}} has changed when using {{RocksDB}}:
> https://github.com/apache/flink/blob/953a5ffcbdae4115f7d525f310723cf8770779df/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L542
> This was disallowed in the initial support for state schema evolution because 
> the way we did state evolution in the RocksDB state backend was simply 
> overwriting values.
> For {{MapState}} key evolution, only overwriting RocksDB values does not 
> work, since RocksDB entries for {{MapState}} uses a composite key containing 
> the map state key. This means that when evolving {{MapState}} in this case 
> with an evolved key schema, we will have new entries.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11947) Support MapState value schema evolution for RocksDB

2019-05-29 Thread Cliff Resnick (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16850743#comment-16850743
 ] 

Cliff Resnick commented on FLINK-11947:
---

Thanks [~klion26]. I look forward to testing your fix! 

> Support MapState value schema evolution for RocksDB
> ---
>
> Key: FLINK-11947
> URL: https://issues.apache.org/jira/browse/FLINK-11947
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System, Runtime / State Backends
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Congxian Qiu(klion26)
>Priority: Critical
>
> Currently, we do not attempt to perform state schema evolution if the key or 
> value's schema of a user {{MapState}} has changed when using {{RocksDB}}:
> https://github.com/apache/flink/blob/953a5ffcbdae4115f7d525f310723cf8770779df/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L542
> This was disallowed in the initial support for state schema evolution because 
> the way we did state evolution in the RocksDB state backend was simply 
> overwriting values.
> For {{MapState}} key evolution, only overwriting RocksDB values does not 
> work, since RocksDB entries for {{MapState}} uses a composite key containing 
> the map state key. This means that when evolving {{MapState}} in this case 
> with an evolved key schema, we will have new entries.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11947) Support MapState key / value schema evolution for RocksDB

2019-05-24 Thread Cliff Resnick (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16847669#comment-16847669
 ] 

Cliff Resnick commented on FLINK-11947:
---

Does the deescalation from Blocker mean that this will not likely be fixed
by 1.9?

On Fri, May 24, 2019, 10:15 AM Aljoscha Krettek (JIRA) 



> Support MapState key / value schema evolution for RocksDB
> -
>
> Key: FLINK-11947
> URL: https://issues.apache.org/jira/browse/FLINK-11947
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System, Runtime / State Backends
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Currently, we do not attempt to perform state schema evolution if the key or 
> value's schema of a user {{MapState}} has changed when using {{RocksDB}}:
> https://github.com/apache/flink/blob/953a5ffcbdae4115f7d525f310723cf8770779df/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L542
> This was disallowed in the initial support for state schema evolution because 
> the way we did state evolution in the RocksDB state backend was simply 
> overwriting values.
> For {{MapState}} key evolution, only overwriting RocksDB values does not 
> work, since RocksDB entries for {{MapState}} uses a composite key containing 
> the map state key. This means that when evolving {{MapState}} in this case 
> with an evolved key schema, we will have new entries.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12334) change to MockStreamTask breaks OneInputStreamOperatorTestHarness

2019-05-02 Thread Cliff Resnick (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16831932#comment-16831932
 ] 

Cliff Resnick edited comment on FLINK-12334 at 5/2/19 8:28 PM:
---

Hi [~fan_li_ya]

We have a custom StreamOperator with an open() method like:

 

{{override def open(): Unit = {}}
{{ getRuntimeContext.addAccumulator("combiner-in", recordsIn)}}
{{  getRuntimeContext.addAccumulator("combiner-out", recordsOut)}}

{{}}}

The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the 
MockStreamTask  returned by getRuntimeContext is trying to add the accumulator 
to an immutable empty map.

 

 

 


was (Author: cre...@gmail.com):
Hi [~fan_li_ya]

We have a custom StreamOperator with an open() method like:

 

{{override def open(): Unit = {}}
{{ getRuntimeContext.addAccumulator("combiner-in", recordsIn)}}
{{ getRuntimeContext.addAccumulator("combiner-out", recordsOut)}}

{{ getProcessingTimeService.scheduleAtFixedRate(this, pause, pause)}}
{{}}}

The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the 
MockStreamTask  returned by getRuntimeContext is trying to add the accumulator 
to an immutable empty map.

 

 

 

> change to MockStreamTask breaks OneInputStreamOperatorTestHarness
> -
>
> Key: FLINK-12334
> URL: https://issues.apache.org/jira/browse/FLINK-12334
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.8.0
>Reporter: Cliff Resnick
>Priority: Major
>
> The move to the MockStreamTask is created with does not include 
> initialization of an Accumulator Map when using the builder
> [https://github.com/apache/flink/blob/e43d55445e7abcadb92460bb4d61e28540f1189d/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java#L198]
> This results in with a TestHarness whose context contains an immutable empty 
> map and is breaking tests. The fix is simple, please include an actual map in 
> the builder call.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12334) change to MockStreamTask breaks OneInputStreamOperatorTestHarness

2019-05-02 Thread Cliff Resnick (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16831932#comment-16831932
 ] 

Cliff Resnick edited comment on FLINK-12334 at 5/2/19 8:27 PM:
---

Hi [~fan_li_ya]

We have a custom StreamOperator with an open() method like:

{{override def open(): Unit = {}}
{{ getRuntimeContext.addAccumulator("combiner-in", recordsIn)}}
{{ getRuntimeContext.addAccumulator("combiner-out", recordsOut)}}

{{ getProcessingTimeService.scheduleAtFixedRate(this, pause, pause)}}
{{}}}

 

The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the 
MockStreamTask  returned by getRuntimeContext is trying to add the accumulator 
to an immutable empty map.

 

 

 


was (Author: cre...@gmail.com):
Hi [~fan_li_ya]

We have a custom StreamOperator with an open() method like:
{{
  override def open(): Unit = {
getRuntimeContext.addAccumulator("combiner-in", recordsIn)
getRuntimeContext.addAccumulator("combiner-out", recordsOut)

getProcessingTimeService.scheduleAtFixedRate(this, pause, pause)
  }
}}
 

The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the 
MockStreamTask  returned by getRuntimeContext is trying to add the accumulator 
to an immutable empty map.

 

 

 

> change to MockStreamTask breaks OneInputStreamOperatorTestHarness
> -
>
> Key: FLINK-12334
> URL: https://issues.apache.org/jira/browse/FLINK-12334
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.8.0
>Reporter: Cliff Resnick
>Priority: Major
>
> The move to the MockStreamTask is created with does not include 
> initialization of an Accumulator Map when using the builder
> [https://github.com/apache/flink/blob/e43d55445e7abcadb92460bb4d61e28540f1189d/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java#L198]
> This results in with a TestHarness whose context contains an immutable empty 
> map and is breaking tests. The fix is simple, please include an actual map in 
> the builder call.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12334) change to MockStreamTask breaks OneInputStreamOperatorTestHarness

2019-05-02 Thread Cliff Resnick (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16831932#comment-16831932
 ] 

Cliff Resnick edited comment on FLINK-12334 at 5/2/19 8:27 PM:
---

Hi [~fan_li_ya]

We have a custom StreamOperator with an open() method like:

 

{{override def open(): Unit = {}}
{{ getRuntimeContext.addAccumulator("combiner-in", recordsIn)}}
{{ getRuntimeContext.addAccumulator("combiner-out", recordsOut)}}

{{ getProcessingTimeService.scheduleAtFixedRate(this, pause, pause)}}
{{}}}

The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the 
MockStreamTask  returned by getRuntimeContext is trying to add the accumulator 
to an immutable empty map.

 

 

 


was (Author: cre...@gmail.com):
Hi [~fan_li_ya]

We have a custom StreamOperator with an open() method like:

{{override def open(): Unit = {}}
{{ getRuntimeContext.addAccumulator("combiner-in", recordsIn)}}
{{ getRuntimeContext.addAccumulator("combiner-out", recordsOut)}}

{{ getProcessingTimeService.scheduleAtFixedRate(this, pause, pause)}}
{{}}}

 

The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the 
MockStreamTask  returned by getRuntimeContext is trying to add the accumulator 
to an immutable empty map.

 

 

 

> change to MockStreamTask breaks OneInputStreamOperatorTestHarness
> -
>
> Key: FLINK-12334
> URL: https://issues.apache.org/jira/browse/FLINK-12334
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.8.0
>Reporter: Cliff Resnick
>Priority: Major
>
> The move to the MockStreamTask is created with does not include 
> initialization of an Accumulator Map when using the builder
> [https://github.com/apache/flink/blob/e43d55445e7abcadb92460bb4d61e28540f1189d/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java#L198]
> This results in with a TestHarness whose context contains an immutable empty 
> map and is breaking tests. The fix is simple, please include an actual map in 
> the builder call.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12334) change to MockStreamTask breaks OneInputStreamOperatorTestHarness

2019-05-02 Thread Cliff Resnick (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16831932#comment-16831932
 ] 

Cliff Resnick edited comment on FLINK-12334 at 5/2/19 8:26 PM:
---

Hi [~fan_li_ya]

We have a custom StreamOperator with an open() method like:
{{
  override def open(): Unit = {
getRuntimeContext.addAccumulator("combiner-in", recordsIn)
getRuntimeContext.addAccumulator("combiner-out", recordsOut)

getProcessingTimeService.scheduleAtFixedRate(this, pause, pause)
  }
}}
 

The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the 
MockStreamTask  returned by getRuntimeContext is trying to add the accumulator 
to an immutable empty map.

 

 

 


was (Author: cre...@gmail.com):
Hi [~fan_li_ya]

We have a custom StreamOperator with an open() method like:

override def open(): Unit = {
 getRuntimeContext.addAccumulator("combiner-in", recordsIn)
 getRuntimeContext.addAccumulator("combiner-out", recordsOut)

 getProcessingTimeService.scheduleAtFixedRate(this, pause, pause)
}

The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the 
MockStreamTask  returned by getRuntimeContext is trying to add the accumulator 
to an immutable empty map.

 

 

 

> change to MockStreamTask breaks OneInputStreamOperatorTestHarness
> -
>
> Key: FLINK-12334
> URL: https://issues.apache.org/jira/browse/FLINK-12334
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.8.0
>Reporter: Cliff Resnick
>Priority: Major
>
> The move to the MockStreamTask is created with does not include 
> initialization of an Accumulator Map when using the builder
> [https://github.com/apache/flink/blob/e43d55445e7abcadb92460bb4d61e28540f1189d/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java#L198]
> This results in with a TestHarness whose context contains an immutable empty 
> map and is breaking tests. The fix is simple, please include an actual map in 
> the builder call.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12334) change to MockStreamTask breaks OneInputStreamOperatorTestHarness

2019-05-02 Thread Cliff Resnick (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16831932#comment-16831932
 ] 

Cliff Resnick commented on FLINK-12334:
---

Hi [~fan_li_ya]

We have a custom StreamOperator with an open() method like:

override def open(): Unit = {
 getRuntimeContext.addAccumulator("combiner-in", recordsIn)
 getRuntimeContext.addAccumulator("combiner-out", recordsOut)

 getProcessingTimeService.scheduleAtFixedRate(this, pause, pause)
}

The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the 
MockStreamTask  returned by getRuntimeContext is trying to add the accumulator 
to an immutable empty map.

 

 

 

> change to MockStreamTask breaks OneInputStreamOperatorTestHarness
> -
>
> Key: FLINK-12334
> URL: https://issues.apache.org/jira/browse/FLINK-12334
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.8.0
>Reporter: Cliff Resnick
>Priority: Major
>
> The move to the MockStreamTask is created with does not include 
> initialization of an Accumulator Map when using the builder
> [https://github.com/apache/flink/blob/e43d55445e7abcadb92460bb4d61e28540f1189d/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java#L198]
> This results in with a TestHarness whose context contains an immutable empty 
> map and is breaking tests. The fix is simple, please include an actual map in 
> the builder call.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12334) change to MockStreamTask breaks OneInputStreamOperatorTestHarness

2019-04-25 Thread Cliff Resnick (JIRA)
Cliff Resnick created FLINK-12334:
-

 Summary: change to MockStreamTask breaks 
OneInputStreamOperatorTestHarness
 Key: FLINK-12334
 URL: https://issues.apache.org/jira/browse/FLINK-12334
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 1.8.0
Reporter: Cliff Resnick


The move to the MockStreamTask is created with does not include initialization 
of an Accumulator Map when using the builder

[https://github.com/apache/flink/blob/e43d55445e7abcadb92460bb4d61e28540f1189d/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java#L198]

This results in with a TestHarness whose context contains an immutable empty 
map and is breaking tests. The fix is simple, please include an actual map in 
the builder call.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11947) Support MapState key / value schema evolution for RocksDB

2019-03-18 Thread Cliff Resnick (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16795072#comment-16795072
 ] 

Cliff Resnick commented on FLINK-11947:
---

Thanks for the explanation, that makes sense. However, I'm guessing in practice 
the vast preponderance of Schema Evolution happens on the Value side. Is the 
detection specific enough to perhaps make the guard exclusive to the Key side? 
Because it's clear that's where the technical "dragons" await, but perhaps in 
reality no one ever goes there. 

> Support MapState key / value schema evolution for RocksDB
> -
>
> Key: FLINK-11947
> URL: https://issues.apache.org/jira/browse/FLINK-11947
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System, Runtime / State Backends
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
>
> Currently, we do not attempt to perform state schema evolution if the key or 
> value's schema of a user {{MapState}} has changed when using {{RocksDB}}:
> https://github.com/apache/flink/blob/953a5ffcbdae4115f7d525f310723cf8770779df/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L542
> This was disallowed in the initial support for state schema evolution because 
> the way we did state evolution in the RocksDB state backend was simply 
> overwriting values.
> For {{MapState}} key evolution, only overwriting RocksDB values does not 
> work, since RocksDB entries for {{MapState}} uses a composite key containing 
> the map state key. This means that when evolving {{MapState}} in this case 
> with an evolved key schema, we will have new entries.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10671) rest monitoring api Savepoint status call fails if akka.ask.timeout < checkpoint duration

2018-11-05 Thread Cliff Resnick (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675184#comment-16675184
 ] 

Cliff Resnick commented on FLINK-10671:
---

[~gjy] yes, I think when I created the issue I couldn't find a way to select 
1.6.0. I should have tried upgrading first! I will do so and reattempt with 
smaller akka.ask.timeout today.

> rest monitoring api Savepoint status call fails if akka.ask.timeout < 
> checkpoint duration
> -
>
> Key: FLINK-10671
> URL: https://issues.apache.org/jira/browse/FLINK-10671
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.6.1
>Reporter: Cliff Resnick
>Assignee: Gary Yao
>Priority: Minor
>
> Hi,
>  
> There seems to be a problem with REST monitoring API:
> |/jobs/:jobid/savepoints/:triggerid|
>  
> The problem is that when the Savepoint represented by {{:triggerid}} is 
> called with {{cancel=true}} the above status call seems to fail if the 
> savepoint duration exceeds {{akka.ask.timeout}} value.
>  
> Below is a log in which I invoke "cancel with savepoint" then poll the above 
> endpoint for status at 2 second intervals. {{akka.ask.timout}} is set for 
> twenty seconds. The error is repeatable at various values of 
> {{akka.ask.timeout}}.
>  
> {noformat}
> 2018/10/24 19:42:25 savepoint id 925964b35b2d501f4a45b714eca0a2ca is 
> IN_PROGRESS
> 2018/10/24 19:42:27 savepoint id 925964b35b2d501f4a45b714eca0a2ca is 
> IN_PROGRESS
> 2018/10/24 19:42:29 savepoint id 925964b35b2d501f4a45b714eca0a2ca is 
> IN_PROGRESS
> 2018/10/24 19:42:31 savepoint id 925964b35b2d501f4a45b714eca0a2ca is 
> IN_PROGRESS
> 2018/10/24 19:42:33 savepoint id 925964b35b2d501f4a45b714eca0a2ca is 
> IN_PROGRESS
> 2018/10/24 19:42:35 savepoint id 925964b35b2d501f4a45b714eca0a2ca is 
> IN_PROGRESS
> 2018/10/24 19:42:37 savepoint id 925964b35b2d501f4a45b714eca0a2ca is 
> IN_PROGRESS
> 2018/10/24 19:42:39 savepoint id 925964b35b2d501f4a45b714eca0a2ca is 
> IN_PROGRESS
> 2018/10/24 19:42:41 savepoint id 925964b35b2d501f4a45b714eca0a2ca is 
> IN_PROGRESS
> 2018/10/24 19:42:43 savepoint id 925964b35b2d501f4a45b714eca0a2ca is 
> IN_PROGRESS
> 2018/10/24 19:42:45 Cancel with Savepoint may have failed: 
> java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: 
> Ask timed out on [Actor[akka://flink/user/jobmanager_0#-234856817]] after 
> [2 ms]. Sender[null] sent message of type 
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
> at 
> java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
> at 
> java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
> at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
> at 
> java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at 
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:770)
> at akka.dispatch.OnComplete.internal(Future.scala:258)
> at akka.dispatch.OnComplete.internal(Future.scala:256)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> at 
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
> at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
> at 
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
> at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
> at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/jobmanager_0#-234856817]] after [2 ms]. 
> Sender[null] sent message of type 
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".

[jira] [Created] (FLINK-10671) rest monitoring api Savepoint status call fails if akka.ask.timeout < checkpoint duration

2018-10-24 Thread Cliff Resnick (JIRA)
Cliff Resnick created FLINK-10671:
-

 Summary: rest monitoring api Savepoint status call fails if 
akka.ask.timeout < checkpoint duration
 Key: FLINK-10671
 URL: https://issues.apache.org/jira/browse/FLINK-10671
 Project: Flink
  Issue Type: Bug
  Components: REST
Affects Versions: 1.6.1
Reporter: Cliff Resnick


Hi,
 
There seems to be a problem with REST monitoring API:
|/jobs/:jobid/savepoints/:triggerid|
 
The problem is that when the Savepoint represented by :triggerid is called with 
`cancel=true` the above status call seems to fail if the savepoint duration 
exceeds `akka.ask.timeout` value.
 
Below is a log in which I invoke "cancel with savepoint" then poll the above 
endpoint for status at 2 second intervals. akka.ask.timout is set for twenty 
seconds. The error is repeatable at various values of akka.ask.timeout.
 
2018/10/24 19:42:25 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS
2018/10/24 19:42:27 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS
2018/10/24 19:42:29 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS
2018/10/24 19:42:31 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS
2018/10/24 19:42:33 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS
2018/10/24 19:42:35 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS
2018/10/24 19:42:37 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS
2018/10/24 19:42:39 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS
2018/10/24 19:42:41 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS
2018/10/24 19:42:43 savepoint id 925964b35b2d501f4a45b714eca0a2ca is IN_PROGRESS
2018/10/24 19:42:45 Cancel with Savepoint may have failed: 
java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask 
timed out on [Actor[akka://flink/user/jobmanager_0#-234856817]] after [2 
ms]. Sender[null] sent message of type 
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
at 
java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at 
java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
at 
java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:770)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:748)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/jobmanager_0#-234856817]] after [2 ms]. 
Sender[null] sent message of type 
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
... 9 more



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9339) Accumulators are not UI accessible running in FLIP-6 mode

2018-05-11 Thread Cliff Resnick (JIRA)
Cliff Resnick created FLINK-9339:


 Summary: Accumulators are not UI accessible running in FLIP-6 mode
 Key: FLINK-9339
 URL: https://issues.apache.org/jira/browse/FLINK-9339
 Project: Flink
  Issue Type: Bug
  Components: REST
Affects Versions: 1.5.0
Reporter: Cliff Resnick


Using 1.5-rc2, when I run a job in flip-6 mode and try to access Accumulators 
in the UI nothing shows. Looking at the Job manager log there is this error: 
 
2018-05-11 17:09:04,707 ERROR 
org.apache.flink.runtime.rest.handler.job.SubtaskCurrentAttemptDetailsHandler  
- Could not create the handler request.
org.apache.flink.runtime.rest.handler.HandlerRequestException: Cannot resolve 
path parameter (subtaskindex) from value "accumulators".
at 
org.apache.flink.runtime.rest.handler.HandlerRequest.(HandlerRequest.java:61)
at 
org.apache.flink.runtime.rest.AbstractHandler.respondAsLeader(AbstractHandler.java:155)
at 
org.apache.flink.runtime.rest.handler.RedirectHandler.lambda$null$0(RedirectHandler.java:139)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:748)
 
This error does not occur when running the same job in legacy mode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8616) Missing null check in OperatorChain.CopyingChainingOutput#pushToOperator masks ClassCastException

2018-02-08 Thread Cliff Resnick (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cliff Resnick updated FLINK-8616:
-
Summary: Missing null check in 
OperatorChain.CopyingChainingOutput#pushToOperator masks ClassCastException  
(was: Missing null check in OperatorChain.copyingChainOutput#pushToOperator 
masks ClassCastException)

> Missing null check in OperatorChain.CopyingChainingOutput#pushToOperator 
> masks ClassCastException
> -
>
> Key: FLINK-8616
> URL: https://issues.apache.org/jira/browse/FLINK-8616
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Cliff Resnick
>Priority: Major
>
> There is an attempt to enrich the exception with outputTag#getId, but 
> outputTag is null, and a NullPointerException is thrown. Looking at the 
> attempted message enrichment the code seems to assume a ClassCastException 
> can only stem from SideOutput type mismatches. This may have been the norm 
> before, but changes to classloader delegation in 1.4 have given rise to 
> multiple ClassLoader conflicts (at least for us), and they all seem to end up 
> here.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8616) Missing null check in OperatorChain.copyingChainOutput#pushToOperator masks ClassCastException

2018-02-08 Thread Cliff Resnick (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cliff Resnick updated FLINK-8616:
-
Summary: Missing null check in 
OperatorChain.copyingChainOutput#pushToOperator masks ClassCastException  (was: 
Missing null check in OperatorChain#pushToOperator masks ClassCastException)

> Missing null check in OperatorChain.copyingChainOutput#pushToOperator masks 
> ClassCastException
> --
>
> Key: FLINK-8616
> URL: https://issues.apache.org/jira/browse/FLINK-8616
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Cliff Resnick
>Priority: Major
>
> There is an attempt to enrich the exception with outputTag#getId, but 
> outputTag is null, and a NullPointerException is thrown. Looking at the 
> attempted message enrichment the code seems to assume a ClassCastException 
> can only stem from SideOutput type mismatches. This may have been the norm 
> before, but changes to classloader delegation in 1.4 have given rise to 
> multiple ClassLoader conflicts (at least for us), and they all seem to end up 
> here.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8616) Missing null check in OperatorChain#pushToOperator masks ClassCastException

2018-02-08 Thread Cliff Resnick (JIRA)
Cliff Resnick created FLINK-8616:


 Summary: Missing null check in OperatorChain#pushToOperator masks 
ClassCastException
 Key: FLINK-8616
 URL: https://issues.apache.org/jira/browse/FLINK-8616
 Project: Flink
  Issue Type: Bug
  Components: DataStream API
Affects Versions: 1.4.0
Reporter: Cliff Resnick


There is an attempt to enrich the exception with outputTag#getId, but outputTag 
is null, and a NullPointerException is thrown. Looking at the attempted message 
enrichment the code seems to assume a ClassCastException can only stem from 
SideOutput type mismatches. This may have been the norm before, but changes to 
classloader delegation in 1.4 have given rise to multiple ClassLoader conflicts 
(at least for us), and they all seem to end up here.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore

2017-06-26 Thread Cliff Resnick (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16064079#comment-16064079
 ] 

Cliff Resnick commented on FLINK-6964:
--

[~srichter] Looks good from this end, all tests passed. Thanks!

> Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
> --
>
> Key: FLINK-6964
> URL: https://issues.apache.org/jira/browse/FLINK-6964
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> {{StandaloneCompletedCheckpointStore}} does not register shared states ion 
> resume. However, for externalized checkpoints, it register the checkpoint 
> from which it resumed. This checkpoint gets added to the completed checkpoint 
> store as part of resume.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore

2017-06-26 Thread Cliff Resnick (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16063768#comment-16063768
 ] 

Cliff Resnick commented on FLINK-6964:
--

[~srichter] So far, looks good! I need to leave early today but I'll hit it a 
few more times this evening just to be sure. 

> Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
> --
>
> Key: FLINK-6964
> URL: https://issues.apache.org/jira/browse/FLINK-6964
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> {{StandaloneCompletedCheckpointStore}} does not register shared states ion 
> resume. However, for externalized checkpoints, it register the checkpoint 
> from which it resumed. This checkpoint gets added to the completed checkpoint 
> store as part of resume.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore

2017-06-23 Thread Cliff Resnick (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061270#comment-16061270
 ] 

Cliff Resnick commented on FLINK-6964:
--

looks likes it's still trying to register a Placeholder?

> Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
> --
>
> Key: FLINK-6964
> URL: https://issues.apache.org/jira/browse/FLINK-6964
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> {{StandaloneCompletedCheckpointStore}} does not register shared states ion 
> resume. However, for externalized checkpoints, it register the checkpoint 
> from which it resumed. This checkpoint gets added to the completed checkpoint 
> store as part of resume.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore

2017-06-23 Thread Cliff Resnick (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061257#comment-16061257
 ] 

Cliff Resnick commented on FLINK-6964:
--

I ran with your newer precondition. It actually succeeded once, but failed the 
next two runs, hung on org.apache.flink.runtime.state.SharedStateRegistry  - 
Attempt to register for key WindowOperator...  

I tried with just a a single slot, but that also hung. The log above represents 
the hang condition.

All the above logged here 
https://gist.github.com/cresny/0e109f843730b64d3a330f8fb06bb8a6

The good news is there was an exception around state registry

> Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
> --
>
> Key: FLINK-6964
> URL: https://issues.apache.org/jira/browse/FLINK-6964
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> {{StandaloneCompletedCheckpointStore}} does not register shared states ion 
> resume. However, for externalized checkpoints, it register the checkpoint 
> from which it resumed. This checkpoint gets added to the completed checkpoint 
> store as part of resume.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore

2017-06-23 Thread Cliff Resnick (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061216#comment-16061216
 ] 

Cliff Resnick commented on FLINK-6964:
--

ok, will try that. meanwhile here is a run (and hang) from last push.

https://gist.github.com/cresny/8d0d24b1bd72031a515bd9a3822da189

> Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
> --
>
> Key: FLINK-6964
> URL: https://issues.apache.org/jira/browse/FLINK-6964
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> {{StandaloneCompletedCheckpointStore}} does not register shared states ion 
> resume. However, for externalized checkpoints, it register the checkpoint 
> from which it resumed. This checkpoint gets added to the completed checkpoint 
> store as part of resume.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore

2017-06-23 Thread Cliff Resnick (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061178#comment-16061178
 ] 

Cliff Resnick commented on FLINK-6964:
--

ok I'll wait on your push

> Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
> --
>
> Key: FLINK-6964
> URL: https://issues.apache.org/jira/browse/FLINK-6964
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> {{StandaloneCompletedCheckpointStore}} does not register shared states ion 
> resume. However, for externalized checkpoints, it register the checkpoint 
> from which it resumed. This checkpoint gets added to the completed checkpoint 
> store as part of resume.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore

2017-06-23 Thread Cliff Resnick (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061163#comment-16061163
 ] 

Cliff Resnick commented on FLINK-6964:
--

Ha! I just started running. ok, will merge and rebuild
The logging scopes above are for a separate network appender, so I tend to keep 
it narrow. Should I broaden it to all of flink.runtime?

> Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
> --
>
> Key: FLINK-6964
> URL: https://issues.apache.org/jira/browse/FLINK-6964
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> {{StandaloneCompletedCheckpointStore}} does not register shared states ion 
> resume. However, for externalized checkpoints, it register the checkpoint 
> from which it resumed. This checkpoint gets added to the completed checkpoint 
> store as part of resume.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore

2017-06-23 Thread Cliff Resnick (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061092#comment-16061092
 ] 

Cliff Resnick commented on FLINK-6964:
--

I'll merge and rerun. This is what I have for log scope. Should I add anything? 
{noformat}
org.apache.flink.contrib.streaming.state=TRACE
org.apache.flink.runtime.checkpoint=TRACE
org.apache.flink.runtime.state=TRACE
{noformat}


> Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
> --
>
> Key: FLINK-6964
> URL: https://issues.apache.org/jira/browse/FLINK-6964
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> {{StandaloneCompletedCheckpointStore}} does not register shared states ion 
> resume. However, for externalized checkpoints, it register the checkpoint 
> from which it resumed. This checkpoint gets added to the completed checkpoint 
> store as part of resume.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore

2017-06-23 Thread Cliff Resnick (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061088#comment-16061088
 ] 

Cliff Resnick commented on FLINK-6964:
--

[~srichter] By hanging I mean that the checkpoint, though fully acknowledged, 
never completes. Looking at the UI I see 100% and a spinning arrow until the 
checkpoint time expires, apparently without an exception being thrown. I did 
not merge my added logs into your branch because, from what you described, the 
issue was with the {code:java}StandaloneCompletedCheckpointStore{code}, which I 
never added logging to anyway. However if the code as-is in your branch has 
sufficient logging I can reproduce the issue and create a gist.








> Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
> --
>
> Key: FLINK-6964
> URL: https://issues.apache.org/jira/browse/FLINK-6964
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> {{StandaloneCompletedCheckpointStore}} does not register shared states ion 
> resume. However, for externalized checkpoints, it register the checkpoint 
> from which it resumed. This checkpoint gets added to the completed checkpoint 
> store as part of resume.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6964) Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore

2017-06-22 Thread Cliff Resnick (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060027#comment-16060027
 ] 

Cliff Resnick commented on FLINK-6964:
--

[~srichter] I tried your fix. After resuming from a checkpoint, the first 
subsequent checkpoint got to 100% and hung, then expired several minutes later. 
The second one repeated this.  If logging will help you perhaps if you can add 
some TRACE level logs and let me know the scopes, and I'll create a gist.

> Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore
> --
>
> Key: FLINK-6964
> URL: https://issues.apache.org/jira/browse/FLINK-6964
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> {{StandaloneCompletedCheckpointStore}} does not register shared states ion 
> resume. However, for externalized checkpoints, it register the checkpoint 
> from which it resumed. This checkpoint gets added to the completed checkpoint 
> store as part of resume.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6633) Register with shared state registry before adding to CompletedCheckpointStore

2017-06-21 Thread Cliff Resnick (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16058158#comment-16058158
 ] 

Cliff Resnick commented on FLINK-6633:
--

Thanks [~srichter], I'll give this a try tomorrow morning EST.

> Register with shared state registry before adding to CompletedCheckpointStore
> -
>
> Key: FLINK-6633
> URL: https://issues.apache.org/jira/browse/FLINK-6633
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Blocker
> Fix For: 1.3.0
>
>
> Introducing placeholders for previously existing shared state requires a 
> change that shared state is first registering with {{SharedStateregistry}} 
> (thereby being consolidated) and only after that added to a 
> {{CompletedCheckpointStore}}, so that the consolidated checkpoint is written 
> to stable storage. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6633) Register with shared state registry before adding to CompletedCheckpointStore

2017-06-20 Thread Cliff Resnick (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16056537#comment-16056537
 ] 

Cliff Resnick commented on FLINK-6633:
--

Stefan, if you need me to unpack things further please feel free to add 
snippets here and I'll integrate them.

> Register with shared state registry before adding to CompletedCheckpointStore
> -
>
> Key: FLINK-6633
> URL: https://issues.apache.org/jira/browse/FLINK-6633
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Blocker
> Fix For: 1.3.0
>
>
> Introducing placeholders for previously existing shared state requires a 
> change that shared state is first registering with {{SharedStateregistry}} 
> (thereby being consolidated) and only after that added to a 
> {{CompletedCheckpointStore}}, so that the consolidated checkpoint is written 
> to stable storage. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6633) Register with shared state registry before adding to CompletedCheckpointStore

2017-06-20 Thread Cliff Resnick (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16056527#comment-16056527
 ] 

Cliff Resnick commented on FLINK-6633:
--

Ok, new gist is here:
https://gist.github.com/cresny/87c997b558064a7ec5e8021ae7456653

This one cancels job after two checkpoints, first checkpoint after recovery 
fails with Placeholder serialization attempt.

> Register with shared state registry before adding to CompletedCheckpointStore
> -
>
> Key: FLINK-6633
> URL: https://issues.apache.org/jira/browse/FLINK-6633
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Blocker
> Fix For: 1.3.0
>
>
> Introducing placeholders for previously existing shared state requires a 
> change that shared state is first registering with {{SharedStateregistry}} 
> (thereby being consolidated) and only after that added to a 
> {{CompletedCheckpointStore}}, so that the consolidated checkpoint is written 
> to stable storage. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6633) Register with shared state registry before adding to CompletedCheckpointStore

2017-06-20 Thread Cliff Resnick (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16056342#comment-16056342
 ] 

Cliff Resnick commented on FLINK-6633:
--

full log here: https://gist.github.com/cresny/d3c36896ce5692d7772979438c80944e

> Register with shared state registry before adding to CompletedCheckpointStore
> -
>
> Key: FLINK-6633
> URL: https://issues.apache.org/jira/browse/FLINK-6633
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Blocker
> Fix For: 1.3.0
>
>
> Introducing placeholders for previously existing shared state requires a 
> change that shared state is first registering with {{SharedStateregistry}} 
> (thereby being consolidated) and only after that added to a 
> {{CompletedCheckpointStore}}, so that the consolidated checkpoint is written 
> to stable storage. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6633) Register with shared state registry before adding to CompletedCheckpointStore

2017-06-20 Thread Cliff Resnick (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16056334#comment-16056334
 ] 

Cliff Resnick commented on FLINK-6633:
--


{noformat}
2017-06-20 18:44:39.376 [ip-10-150-96-228] INFO  
org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Attempting to 
load RocksDB native library and store it under '/media/flink/tmp0'
2017-06-20 18:44:39.378 [ip-10-150-96-228] DEBUG 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Attempting to 
create RocksDB native library folder 
/media/flink/tmp0/rocksdb-lib-18151a61e3774f0bcd2b1adeed79010e
2017-06-20 18:44:39.476 [ip-10-150-96-228] INFO  
org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Successfully 
loaded RocksDB native library
2017-06-20 18:44:39.521 [ip-10-150-96-228] INFO  
o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Initializing 
RocksDB keyed state backend from snapshot.
2017-06-20 18:44:39.522 [ip-10-150-96-228] DEBUG 
o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Restoring 
snapshot from state handles: null.
2017-06-20 18:45:45.067 [ip-10-150-96-228] INFO  
com.mediamath.reporting.combiner.Combiner  - start combiner snapshot for id 0
2017-06-20 18:45:47.241 [ip-10-150-96-228] DEBUG 
o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Got 
StreamStateHandle from stream.closeAndGetHandle File State: 
s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/00cae703-95e3-44a9-8914-5bc4f3eb7814
 [5788104 bytes]
2017-06-20 18:45:47.241 [ip-10-150-96-228] DEBUG 
o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - mapping 
11.sst ->  File State: 
s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/00cae703-95e3-44a9-8914-5bc4f3eb7814
 [5788104 bytes]
2017-06-20 18:45:47.243 [ip-10-150-96-228] DEBUG 
o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Got 
StreamStateHandle from stream.closeAndGetHandle 
ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/050ebbaf-d0e6-4a54-90f0-aee0e8eaca21'}
2017-06-20 18:45:47.451 [ip-10-150-96-228] DEBUG 
o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Got 
StreamStateHandle from stream.closeAndGetHandle File State: 
s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/09c9f86b-8742-4871-8827-8f65f1484a8e
 [8289 bytes]
2017-06-20 18:45:47.451 [ip-10-150-96-228] DEBUG 
o.a.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Got 
StreamStateHandle from stream.closeAndGetHandle 
ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/3db783b5-bec0-4f2a-8f9f-43b80da7ab41'}
2017-06-20 18:45:51.609 [ip-10-150-96-53] DEBUG 
o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serialize 1
2017-06-20 18:45:51.620 [ip-10-150-96-53] TRACE 
o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing 
File State: 
s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/a5431b25-aff1-4a23-ae29-a748deba6dea
 [30679044 bytes]
2017-06-20 18:45:51.620 [ip-10-150-96-53] TRACE 
o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing 
File State: 
s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/d00732ce-7226-4247-bdac-7f03d259e575
 [23438 bytes]
2017-06-20 18:45:51.620 [ip-10-150-96-53] TRACE 
o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing 
File State: 
s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/00cae703-95e3-44a9-8914-5bc4f3eb7814
 [5788104 bytes]
2017-06-20 18:45:51.621 [ip-10-150-96-53] TRACE 
o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing 
ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/050ebbaf-d0e6-4a54-90f0-aee0e8eaca21'}
2017-06-20 18:45:51.621 [ip-10-150-96-53] TRACE 
o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing 
File State: 
s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/09c9f86b-8742-4871-8827-8f65f1484a8e
 [8289 bytes]
2017-06-20 18:45:51.621 [ip-10-150-96-53] TRACE 
o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing 
ByteStreamStateHandle{handleName='s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/3db783b5-bec0-4f2a-8f9f-43b80da7ab41'}
2017-06-20 18:45:51.622 [ip-10-150-96-53] TRACE 
o.a.flink.runtime.checkpoint.savepoint.SavepointV2Serializer  - serializing 
File State: 
s3://mm-prod-stats-pulse-streaming/test/state/4717381ff29adb6cf31273d383179850/chk-1/6f6bdb7c-9bd7-48d2-8705-0f15f34ac8f8
 [7209046 bytes]
2017-06-20 18:45:51.625 [ip-10-150-96-53] TRACE 

[jira] [Commented] (FLINK-6633) Register with shared state registry before adding to CompletedCheckpointStore

2017-06-20 Thread Cliff Resnick (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16056332#comment-16056332
 ] 

Cliff Resnick commented on FLINK-6633:
--

The issue that [~gyfora] mentioned still exists in current 1.4-SNAPSHOT, at 
least when using externalized checkpoints. It does not necessarily happen on 
first checkpoint after restore, but it does seem to stem from a job restart 
from externalized checkpoint.  To help identify the cause I added a bit of 
logging to both RocksDBKeyedStateBackend and SavepointV2Serializer, the results 
of which I'm attaching to the issue. The log spans several checkpoints. You can 
see where sst files are mapped, then serialized. The last checkpoint (7) fails 
when it seems to try to serialize a Placeholder instead of 27.sst. 

I hope this helps. If I can add logging to capture more relevant state please 
let me know (the test is reproducible). 

By the way, I also noticed that some sst files are re-serialized in subsequent 
checkpoints though their byte size does not change. Is that because they are 
still "hot" in RocksDB? I'm a bit sketchy on the concept so please forgive me! 

> Register with shared state registry before adding to CompletedCheckpointStore
> -
>
> Key: FLINK-6633
> URL: https://issues.apache.org/jira/browse/FLINK-6633
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Blocker
> Fix For: 1.3.0
>
>
> Introducing placeholders for previously existing shared state requires a 
> change that shared state is first registering with {{SharedStateregistry}} 
> (thereby being consolidated) and only after that added to a 
> {{CompletedCheckpointStore}}, so that the consolidated checkpoint is written 
> to stable storage. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-5646) REST api documentation missing details on jar upload

2017-01-25 Thread Cliff Resnick (JIRA)
Cliff Resnick created FLINK-5646:


 Summary: REST api documentation missing details on jar upload
 Key: FLINK-5646
 URL: https://issues.apache.org/jira/browse/FLINK-5646
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Reporter: Cliff Resnick
Priority: Minor


The 1.2 release documentation 
(https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/rest_api.html)
  states "It is possible to upload, run, and list Flink programs via the REST 
APIs and web frontend". However there is no documentation about uploading a jar 
via REST api. 
There should be something to the effect of:
"You can upload a jar file using http post with the file data sent under a form 
field 'jarfile'."



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4228) RocksDB semi-async snapshot to S3AFileSystem fails

2016-12-23 Thread Cliff Resnick (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15773084#comment-15773084
 ] 

Cliff Resnick commented on FLINK-4228:
--

My last pull request is good to go so I guess it's up to you guys.

> RocksDB semi-async snapshot to S3AFileSystem fails
> --
>
> Key: FLINK-4228
> URL: https://issues.apache.org/jira/browse/FLINK-4228
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>
> Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
> leads to an Exception when uploading the snapshot to S3 when using the 
> {{S3AFileSystem}}.
> {code}
> AsynchronousException{com.amazonaws.AmazonClientException: Unable to 
> calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
> Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInputStream.java:195)
>   at java.io.FileInputStream.(FileInputStream.java:138)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294)
>   ... 9 more
> {code}
> Running with S3NFileSystem, the error does not occur. The problem might be 
> due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created 
> automatically. We might need to manually create folders and copy only actual 
> files for {{S3AFileSystem}}. More investigation is required.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4228) RocksDB semi-async snapshot to S3AFileSystem fails

2016-12-23 Thread Cliff Resnick (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772906#comment-15772906
 ] 

Cliff Resnick commented on FLINK-4228:
--

The issue now is exclusive to running on YARN with s3a:// as your configured 
FileSystem. If so, the Flink session will fail on staging itself because it 
tries to copy the flink/lib directory to S3 and the S3aFileSystem does not 
support recursive copy. 

> RocksDB semi-async snapshot to S3AFileSystem fails
> --
>
> Key: FLINK-4228
> URL: https://issues.apache.org/jira/browse/FLINK-4228
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>
> Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
> leads to an Exception when uploading the snapshot to S3 when using the 
> {{S3AFileSystem}}.
> {code}
> AsynchronousException{com.amazonaws.AmazonClientException: Unable to 
> calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
> Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInputStream.java:195)
>   at java.io.FileInputStream.(FileInputStream.java:138)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294)
>   ... 9 more
> {code}
> Running with S3NFileSystem, the error does not occur. The problem might be 
> due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created 
> automatically. We might need to manually create folders and copy only actual 
> files for {{S3AFileSystem}}. More investigation is required.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4228) RocksDB semi-async snapshot to S3AFileSystem fails

2016-07-22 Thread Cliff Resnick (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15390493#comment-15390493
 ] 

Cliff Resnick commented on FLINK-4228:
--

I added a pull request for this. I included the fink-yarn recursive staging 
upload.

https://github.com/apache/flink/pull/2288

> RocksDB semi-async snapshot to S3AFileSystem fails
> --
>
> Key: FLINK-4228
> URL: https://issues.apache.org/jira/browse/FLINK-4228
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>
> Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
> leads to an Exception when uploading the snapshot to S3 when using the 
> {{S3AFileSystem}}.
> {code}
> AsynchronousException{com.amazonaws.AmazonClientException: Unable to 
> calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
> Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInputStream.java:195)
>   at java.io.FileInputStream.(FileInputStream.java:138)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294)
>   ... 9 more
> {code}
> Running with S3NFileSystem, the error does not occur. The problem might be 
> due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created 
> automatically. We might need to manually create folders and copy only actual 
> files for {{S3AFileSystem}}. More investigation is required.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)