[jira] [Resolved] (FLINK-24818) Incorrect comment of ttlConfig field in StateDescriptor

2021-11-10 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-24818.
--
Resolution: Fixed

merged in master: f8f84da44cdb833f54789c20414c354d3a00efa6

> Incorrect comment of ttlConfig field in StateDescriptor
> ---
>
> Key: FLINK-24818
> URL: https://issues.apache.org/jira/browse/FLINK-24818
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor
>Reporter: jianzhang.yjz
>Assignee: jianzhang.yjz
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> The _*ttlConfig*_ field in the _*StateDescriptor*_ class is annotated 
> incorretly, which may be caused by the copy _*queryableStateName*_ field.
> {code:java}
> /** Name for queries against state created from this StateDescriptor. */
> @Nullable private String queryableStateName;
> /** Name for queries against state created from this StateDescriptor. */
> @Nonnull private StateTtlConfig ttlConfig = StateTtlConfig.DISABLED; {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24852) Cleanup of Orphaned Incremental State Artifacts

2021-11-10 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17441597#comment-17441597
 ] 

Yun Tang commented on FLINK-24852:
--

[~sewen] Thanks for bring up this topic.

First of all, the kernel idea is that we still follow the rule that job manager 
owns all the permission for state artifact, I am not sure whether we would have 
further discussion on task side state ownership, and I could share some 
experience of cleaning orphaned state artifacts before.

# We also introduced the checkpoint id to name of the state artifact to 
identify when the artifact is created.
# As FileSystem#listStatus is very costy, we introduced 
FileSystem#listStatusIterator to mitigate the pressure on DFS (see FLINK-11868)
# The orphan state artifacts should mainly only existed on job failover, and we 
choose to run the cleanup only after the job starts to run instead of periodic 
clean.

Moreover, Flink could disable this feature by default and I think adding a 
restapi to manually trigger the cleanup would be great.

> Cleanup of Orphaned Incremental State Artifacts
> ---
>
> Key: FLINK-24852
> URL: https://issues.apache.org/jira/browse/FLINK-24852
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.14.0
>Reporter: Stephan Ewen
>Priority: Major
>
> Shared State Artifacts (state files in the "shared" folder in the DFS / 
> ObjectStore) can become orphaned in various situations:
> * When a TaskManager fails right after it created a state file but before the 
> checkpoint was ack-ed to the JobManager, that state file will be orphaned.
> * When the JobManager fails all state newly added for the currently pending 
> checkpoint will be orphaned.
> These state artifacts are currently impossible to be cleaned up manually, 
> because it isn't easily possible to understand whether they are still being 
> used (referenced by any checkpoint).
> We should introduce a "garbage collector" that identifies and deletes such 
> orphaned state artifacts.
> h2. Idea for a cleanup mechanism
> A periodic cleanup thread would periodically execute a cleanup procedure that 
> searches for and deletes the orphaned artifacts.
> To identify those artifacts, the cleanup procedure needs the following inputs:
> * The oldest retained checkpoint ID
> * A snapshot of the shared state registry
> * A way to identify for each state artifact from which checkpoint it was 
> created.
> The cleanup procedure would
> * enumerate all state artifacts (for example files in the "shared" directory)
> * For each one check whether it was created earlier than the oldest retained 
> checkpoint. If not, that artifact would be skipped, because it might come 
> from a later pending checkpoint, or later canceled checkpoint.
> * Finally, the procedure checks if the state artifact is known by the shared 
> state registry. If yes, the artifact is kept, if not, it is orphaned and will 
> be deleted.
> Because the cleanup procedure is specific to the checkpoint storage, it 
> should probably be instantiated from the checkpoint storage.
> To make it possible to identify the checkpoint for which a state artifact was 
> created, we can put that checkpoint ID into the state file name, for example 
> format the state name as {{"_"}}.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24818) Incorrect comment of ttlConfig field in StateDescriptor

2021-11-09 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17440984#comment-17440984
 ] 

Yun Tang commented on FLINK-24818:
--

[~jianzhang.yjz] Thanks for pointing out this documentation erro, already 
assigned to you.

> Incorrect comment of ttlConfig field in StateDescriptor
> ---
>
> Key: FLINK-24818
> URL: https://issues.apache.org/jira/browse/FLINK-24818
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor
>Reporter: jianzhang.yjz
>Assignee: jianzhang.yjz
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> The _*ttlConfig*_ field in the _*StateDescriptor*_ class is annotated 
> incorretly, which may be caused by the copy _*queryableStateName*_ field.
> {code:java}
> /** Name for queries against state created from this StateDescriptor. */
> @Nullable private String queryableStateName;
> /** Name for queries against state created from this StateDescriptor. */
> @Nonnull private StateTtlConfig ttlConfig = StateTtlConfig.DISABLED; {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-24818) Incorrect comment of ttlConfig field in StateDescriptor

2021-11-09 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-24818:


Assignee: jianzhang.yjz

> Incorrect comment of ttlConfig field in StateDescriptor
> ---
>
> Key: FLINK-24818
> URL: https://issues.apache.org/jira/browse/FLINK-24818
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor
>Reporter: jianzhang.yjz
>Assignee: jianzhang.yjz
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> The _*ttlConfig*_ field in the _*StateDescriptor*_ class is annotated 
> incorretly, which may be caused by the copy _*queryableStateName*_ field.
> {code:java}
> /** Name for queries against state created from this StateDescriptor. */
> @Nullable private String queryableStateName;
> /** Name for queries against state created from this StateDescriptor. */
> @Nonnull private StateTtlConfig ttlConfig = StateTtlConfig.DISABLED; {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-24783) Improve monitoring experience and usability of state backend

2021-11-07 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17440160#comment-17440160
 ] 

Yun Tang edited comment on FLINK-24783 at 11/8/21, 3:13 AM:


[~sjwiesman] [~nkruber], I think it would be helpful to add such configurable 
option to forward RocksDB logs, and you could create a sub-task under this 
ticket.


was (Author: yunta):
[~sjwiesman][~nkruber], I think it would be helpful to add such configurable 
option to forward RocksDB logs, and you could create a sub-task under this 
ticket.

> Improve monitoring experience and usability of state backend 
> -
>
> Key: FLINK-24783
> URL: https://issues.apache.org/jira/browse/FLINK-24783
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics, Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>
> This ticket targets for improving the monitoring experiences and usability 
> for HashMap and EmbededRocksDB state backends.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24783) Improve monitoring experience and usability of state backend

2021-11-07 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17440160#comment-17440160
 ] 

Yun Tang commented on FLINK-24783:
--

[~sjwiesman][~nkruber], I think it would be helpful to add such configurable 
option to forward RocksDB logs, and you could create a sub-task under this 
ticket.

> Improve monitoring experience and usability of state backend 
> -
>
> Key: FLINK-24783
> URL: https://issues.apache.org/jira/browse/FLINK-24783
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics, Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>
> This ticket targets for improving the monitoring experiences and usability 
> for HashMap and EmbededRocksDB state backends.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (FLINK-24475) Remove no longer used NestedMap* classes

2021-11-07 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-24475.
--
Resolution: Fixed

Merged in master: fc4f255644a64bb556b0dcefb165a9c772164c5b

> Remove no longer used NestedMap* classes
> 
>
> Key: FLINK-24475
> URL: https://issues.apache.org/jira/browse/FLINK-24475
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.14.0, 1.13.2, 1.15.0
>Reporter: Piotr Nowojski
>Assignee: Zakelly Lan
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> After FLINK-21935 all of the {{NestedMapsStateTable}} classes are no longer 
> used in the production code. They are still however being used in benchmarks 
> in some tests. Benchmarks/tests should be migrated to {{CopyOnWrite}} 
> versions while the {{NestedMaps}} classes should be removed.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24783) Improve monitoring experience and usability of state backend

2021-11-05 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-24783:
-
Summary: Improve monitoring experience and usability of state backend   
(was: Improve state backend monitoring experience and usability)

> Improve monitoring experience and usability of state backend 
> -
>
> Key: FLINK-24783
> URL: https://issues.apache.org/jira/browse/FLINK-24783
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics, Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>
> This ticket targets for improving the monitoring experiences and usability 
> for HashMap and EmbededRocksDB state backends.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24784) Enable state.backend.latency-track.state-name-as-variable by default

2021-11-05 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-24784:
-
Description: This could help improve the usablility of state access latency.

> Enable state.backend.latency-track.state-name-as-variable by default
> 
>
> Key: FLINK-24784
> URL: https://issues.apache.org/jira/browse/FLINK-24784
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Metrics, Runtime / State Backends
>Reporter: Yun Tang
>Priority: Major
> Fix For: 1.15.0
>
>
> This could help improve the usablility of state access latency.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24786) Introduce RocksDB's statistics related options

2021-11-05 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-24786:
-
Description: Currently, Flink only support RocksDB's native metrics of 
property related. However, such metrics cannot help on performance tunning, we 
could introduce statistics related metrics to help debug performance related 
problem.  (was: Currently, Flink only support RocksDB's native metrics of 
property related. However, such metrics cannot help on performance tunning, we 
could introduce statistis related metrics to help debug performance related 
problem.)
Summary: Introduce RocksDB's statistics related options  (was: 
Introduce RocksDB's statistis related options)

> Introduce RocksDB's statistics related options
> --
>
> Key: FLINK-24786
> URL: https://issues.apache.org/jira/browse/FLINK-24786
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Metrics, Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.15.0
>
>
> Currently, Flink only support RocksDB's native metrics of property related. 
> However, such metrics cannot help on performance tunning, we could introduce 
> statistics related metrics to help debug performance related problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24788) Report RocksDB statistics as metrics

2021-11-05 Thread Yun Tang (Jira)
Yun Tang created FLINK-24788:


 Summary: Report RocksDB statistics as metrics
 Key: FLINK-24788
 URL: https://issues.apache.org/jira/browse/FLINK-24788
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Metrics, Runtime / State Backends
Reporter: Yun Tang
Assignee: Yun Tang
 Fix For: 1.15.0


Integrate RocksDB's statistics with Flink's metrics reporting mechanism.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24786) Introduce RocksDB's statistis related options

2021-11-05 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-24786:
-
Summary: Introduce RocksDB's statistis related options  (was: Introduce 
metrics of RocksDB's statistis)

> Introduce RocksDB's statistis related options
> -
>
> Key: FLINK-24786
> URL: https://issues.apache.org/jira/browse/FLINK-24786
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Metrics, Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.15.0
>
>
> Currently, Flink only support RocksDB's native metrics of property related. 
> However, such metrics cannot help on performance tunning, we could introduce 
> statistis related metrics to help debug performance related problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24787) Add more details of state latency tracking documentation

2021-11-05 Thread Yun Tang (Jira)
Yun Tang created FLINK-24787:


 Summary: Add more details of state latency tracking documentation
 Key: FLINK-24787
 URL: https://issues.apache.org/jira/browse/FLINK-24787
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation, Runtime / Metrics, Runtime / State Backends
Reporter: Yun Tang
 Fix For: 1.15.0, 1.14.1


Current documentation only tells how to enable or configure state latency 
tracking related options. We could add more details of state specific 
descriptions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24786) Introduce metrics of RocksDB's statistis

2021-11-05 Thread Yun Tang (Jira)
Yun Tang created FLINK-24786:


 Summary: Introduce metrics of RocksDB's statistis
 Key: FLINK-24786
 URL: https://issues.apache.org/jira/browse/FLINK-24786
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Metrics, Runtime / State Backends
Reporter: Yun Tang
Assignee: Yun Tang
 Fix For: 1.15.0


Currently, Flink only support RocksDB's native metrics of property related. 
However, such metrics cannot help on performance tunning, we could introduce 
statistis related metrics to help debug performance related problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24785) Relocate RocksDB's log under flink log directory by default

2021-11-05 Thread Yun Tang (Jira)
Yun Tang created FLINK-24785:


 Summary: Relocate RocksDB's log under flink log directory by 
default
 Key: FLINK-24785
 URL: https://issues.apache.org/jira/browse/FLINK-24785
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Reporter: Yun Tang
 Fix For: 1.15.0


Previously, RocksDB's log locates at its own DB folder, which makes the 
debuging RocksDB not so easy. We could let RocksDB's log stay in Flink's log 
directory by default.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24784) Enable state.backend.latency-track.state-name-as-variable by default

2021-11-05 Thread Yun Tang (Jira)
Yun Tang created FLINK-24784:


 Summary: Enable state.backend.latency-track.state-name-as-variable 
by default
 Key: FLINK-24784
 URL: https://issues.apache.org/jira/browse/FLINK-24784
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Metrics, Runtime / State Backends
Reporter: Yun Tang
 Fix For: 1.15.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24783) Improve state backend monitoring experience and usability

2021-11-05 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-24783:
-
Component/s: Runtime / Metrics

> Improve state backend monitoring experience and usability
> -
>
> Key: FLINK-24783
> URL: https://issues.apache.org/jira/browse/FLINK-24783
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics, Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>
> This ticket targets for improving the monitoring experiences and usability 
> for HashMap and EmbededRocksDB state backends.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24783) Improve state backend monitoring experience and usability

2021-11-05 Thread Yun Tang (Jira)
Yun Tang created FLINK-24783:


 Summary: Improve state backend monitoring experience and usability
 Key: FLINK-24783
 URL: https://issues.apache.org/jira/browse/FLINK-24783
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: Yun Tang
Assignee: Yun Tang


This ticket targets for improving the monitoring experiences and usability for 
HashMap and EmbededRocksDB state backends.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24046) Refactor the relationship bwtween PredefinedOptions and RocksDBConfigurableOptions

2021-11-04 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17439002#comment-17439002
 ] 

Yun Tang commented on FLINK-24046:
--

[~Zakelly] Since {{DefaultConfigurableOptionsFactory}} had existed in 
[documentation|https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/state_backends/#passing-options-factory-to-rocksdb]
 for long time. I don't think we should remove it directly to backward 
compatibility.

> Refactor the relationship bwtween PredefinedOptions and 
> RocksDBConfigurableOptions
> --
>
> Key: FLINK-24046
> URL: https://issues.apache.org/jira/browse/FLINK-24046
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Zakelly Lan
>Priority: Major
> Fix For: 1.14.1
>
>
> RocksDBConfigurableOptions mainly focus on the settings of DBOptions and 
> ColumnFamilyOptions. The original design of this class is used to let user 
> could configure RocksDB via configurations instead of programmatically 
> implemented RocksDBOptionsFactory.
> To make the minimal change, original options in RocksDBConfigurableOptions 
> have no default value so that we would not make anything happen in 
> DefaultConfigurableOptionsFactory just as before.
> However, this make user not so clear of the option meaning with no default 
> value, and we could consider change the relationship between them.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-24597) RocksdbStateBackend getKeysAndNamespaces would return duplicate data when using MapState

2021-11-04 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-24597.
--
Resolution: Fixed

Merged
master: a907d92673a711612b287d184c00dad7fa42269f
release-1.14: 547b3befcff50bf8fc2bef4e596cd39a55c7d4b2
release-1.13: 77824d10edb3fd299c242865a3a03fe08e540a85

> RocksdbStateBackend getKeysAndNamespaces would return duplicate data when 
> using MapState 
> -
>
> Key: FLINK-24597
> URL: https://issues.apache.org/jira/browse/FLINK-24597
> Project: Flink
>  Issue Type: Bug
>  Components: API / State Processor, Runtime / State Backends
>Affects Versions: 1.14.0, 1.12.4, 1.13.3
>Reporter: Yue Ma
>Assignee: Yue Ma
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.1, 1.13.4
>
> Attachments: image-2021-11-01-14-19-58-372.png
>
>
> For example, in RocksdbStateBackend , if we worked in VoidNamespace , and And 
> use the ValueState like below .
> {code:java}
> // insert record
> for (int i = 0; i < 3; ++i) {
> keyedStateBackend.setCurrentKey(i);
> testValueState.update(String.valueOf(i));
> }
> {code}
> Then we get all the keysAndNamespace according the method 
> RocksDBKeyedStateBackend#getKeysAndNamespaces().The result of the traversal is
>  <1,VoidNamespace>,<2,VoidNamespace>,<3,VoidNamespace> ,which is as expected.
> Thus,if we use MapState , and update the MapState with different user key, 
> the getKeysAndNamespaces would return duplicate data with same 
> keyAndNamespace.
> {code:java}
> // insert record
> for (int i = 0; i < 3; ++i) {
> keyedStateBackend.setCurrentKey(i);
> mapState.put("userKeyA_" + i, "userValue");
> mapState.put("userKeyB_" + i, "userValue");
> }
> {code}
> The result of the traversal is
>  
> <1,VoidNamespace>,<1,VoidNamespace>,<2,VoidNamespace>,<2,VoidNamespace>,<3,VoidNamespace>,<3,VoidNamespace>.
> By reading the code, I found that the main reason for this problem is in the 
> implementation of _RocksStateKeysAndNamespaceIterator_.
> In the _hasNext_ method, when a new keyAndNamespace is created, there is no 
> comparison with the previousKeyAndNamespace. So we can refer to 
> RocksStateKeysIterator to implement the same logic should solve this problem.
>   
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24597) RocksdbStateBackend getKeysAndNamespaces would return duplicate data when using MapState

2021-11-04 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-24597:
-
Fix Version/s: 1.13.4
   1.14.1
   1.15.0

> RocksdbStateBackend getKeysAndNamespaces would return duplicate data when 
> using MapState 
> -
>
> Key: FLINK-24597
> URL: https://issues.apache.org/jira/browse/FLINK-24597
> Project: Flink
>  Issue Type: Bug
>  Components: API / State Processor, Runtime / State Backends
>Affects Versions: 1.14.0, 1.12.4, 1.13.3
>Reporter: Yue Ma
>Assignee: Yue Ma
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.1, 1.13.4
>
> Attachments: image-2021-11-01-14-19-58-372.png
>
>
> For example, in RocksdbStateBackend , if we worked in VoidNamespace , and And 
> use the ValueState like below .
> {code:java}
> // insert record
> for (int i = 0; i < 3; ++i) {
> keyedStateBackend.setCurrentKey(i);
> testValueState.update(String.valueOf(i));
> }
> {code}
> Then we get all the keysAndNamespace according the method 
> RocksDBKeyedStateBackend#getKeysAndNamespaces().The result of the traversal is
>  <1,VoidNamespace>,<2,VoidNamespace>,<3,VoidNamespace> ,which is as expected.
> Thus,if we use MapState , and update the MapState with different user key, 
> the getKeysAndNamespaces would return duplicate data with same 
> keyAndNamespace.
> {code:java}
> // insert record
> for (int i = 0; i < 3; ++i) {
> keyedStateBackend.setCurrentKey(i);
> mapState.put("userKeyA_" + i, "userValue");
> mapState.put("userKeyB_" + i, "userValue");
> }
> {code}
> The result of the traversal is
>  
> <1,VoidNamespace>,<1,VoidNamespace>,<2,VoidNamespace>,<2,VoidNamespace>,<3,VoidNamespace>,<3,VoidNamespace>.
> By reading the code, I found that the main reason for this problem is in the 
> implementation of _RocksStateKeysAndNamespaceIterator_.
> In the _hasNext_ method, when a new keyAndNamespace is created, there is no 
> comparison with the previousKeyAndNamespace. So we can refer to 
> RocksStateKeysIterator to implement the same logic should solve this problem.
>   
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-18473) Optimize RocksDB disk load balancing strategy

2021-11-04 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-18473.

Resolution: Information Provided

> Optimize RocksDB disk load balancing strategy
> -
>
> Key: FLINK-18473
> URL: https://issues.apache.org/jira/browse/FLINK-18473
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.12.0
>Reporter: future
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-minor
>
> In general, bigdata servers have many disks. For large-state jobs, if 
> multiple slots are running on a TM, then each slot will create a RocksDB 
> instance. We hope that multiple RocksDB instances use different disks to 
> achieve load balancing.
> h3. The problem of current load balancing strategy:
> When the current RocksDB is initialized, a random value nextDirectory is 
> generated according to the number of RocksDB dir: [code 
> link|https://github.com/apache/flink/blob/2d371eb5ac9a3e485d3665cb9a740c65e2ba2ac6/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java#L441]
> {code:java}
> nextDirectory = new Random().nextInt(initializedDbBasePaths.length);
> {code}
> Different slots generate different RocksDBStateBackend objects, so each slot 
> will generate its own *nextDirectory*. The random algorithm used here, so the 
> random value generated by different slots may be the same. For example: the 
> current RocksDB dir is configured with 10 disks, the *nextDirectory* 
> generated by slot0 and slot1 are both 5, then slot0 and slot1 will use the 
> same disk. This disk will be under a lot of pressure, other disks will not be 
> under pressure.
> h3. Optimization ideas:
> *{{nextDirectory}}* should belong to slot sharing, the initial value of 
> *{{nextDirectory}}* cannot be 0, it is still generated by random. But define 
> *nextDirectory* as +_{{static AtomicInteger()}}_+ and execute 
> +_{{nextDirectory.incrementAndGet()}}_+ every time RocksDBKeyedStateBackend 
> is applied for. 
> {{nextDirectory}} takes the remainder of {{initializedDbBasePaths.length}} to 
> decide which disk to use.
> Is there any problem with the above ideas?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18473) Optimize RocksDB disk load balancing strategy

2021-11-04 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17438997#comment-17438997
 ] 

Yun Tang commented on FLINK-18473:
--

[~fanrui] I think your proposed idea cannot resolve the case of running multi 
RocksDB instances in different task-managers on same machine (always from 
disk-0 to disk-N). Since this ticket has been inactive for one year, will close 
it and please feel free to reopen it if you have more ideas.

> Optimize RocksDB disk load balancing strategy
> -
>
> Key: FLINK-18473
> URL: https://issues.apache.org/jira/browse/FLINK-18473
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.12.0
>Reporter: future
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-minor
>
> In general, bigdata servers have many disks. For large-state jobs, if 
> multiple slots are running on a TM, then each slot will create a RocksDB 
> instance. We hope that multiple RocksDB instances use different disks to 
> achieve load balancing.
> h3. The problem of current load balancing strategy:
> When the current RocksDB is initialized, a random value nextDirectory is 
> generated according to the number of RocksDB dir: [code 
> link|https://github.com/apache/flink/blob/2d371eb5ac9a3e485d3665cb9a740c65e2ba2ac6/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java#L441]
> {code:java}
> nextDirectory = new Random().nextInt(initializedDbBasePaths.length);
> {code}
> Different slots generate different RocksDBStateBackend objects, so each slot 
> will generate its own *nextDirectory*. The random algorithm used here, so the 
> random value generated by different slots may be the same. For example: the 
> current RocksDB dir is configured with 10 disks, the *nextDirectory* 
> generated by slot0 and slot1 are both 5, then slot0 and slot1 will use the 
> same disk. This disk will be under a lot of pressure, other disks will not be 
> under pressure.
> h3. Optimization ideas:
> *{{nextDirectory}}* should belong to slot sharing, the initial value of 
> *{{nextDirectory}}* cannot be 0, it is still generated by random. But define 
> *nextDirectory* as +_{{static AtomicInteger()}}_+ and execute 
> +_{{nextDirectory.incrementAndGet()}}_+ every time RocksDBKeyedStateBackend 
> is applied for. 
> {{nextDirectory}} takes the remainder of {{initializedDbBasePaths.length}} to 
> decide which disk to use.
> Is there any problem with the above ideas?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-21543) when using FIFO compaction, I found sst being deleted on the first checkpoint

2021-11-02 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17437722#comment-17437722
 ] 

Yun Tang commented on FLINK-21543:
--

[~Ming Li], you could refer to RocksDB's documentation: 
https://github.com/facebook/rocksdb/wiki/FIFO-compaction-style

{{"Please use FIFO compaction style with caution. Unlike other compaction 
style, it can drop data without informing users."}}

> when using FIFO compaction, I found sst being deleted on the first checkpoint
> -
>
> Key: FLINK-21543
> URL: https://issues.apache.org/jira/browse/FLINK-21543
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: xiaogang zhou
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-minor
> Attachments: LOG (2), image-2021-03-03-11-35-11-458.png, 
> image-2021-03-03-13-09-01-695.png
>
>
> 2021/03/01-18:51:01.202049 7f59042fc700 (Original Log Time 
> 2021/03/01-18:51:01.200883) [/compaction/compaction_picker_fifo.cc:107] 
> [_timer_state/processing_user-timers] FIFO compaction: picking file 1710 with 
> creation time 0 for deletion
>  
> the configuration is like 
> currentOptions.setCompactionStyle(getCompactionStyle());
>  currentOptions.setLevel0FileNumCompactionTrigger(8);
>  // 
> currentOptions.setMaxTableFilesSizeFIFO(MemorySize.parse("2gb").getBytes());
>  CompactionOptionsFIFO compactionOptionsFIFO = new CompactionOptionsFIFO();
>  
> compactionOptionsFIFO.setMaxTableFilesSize(MemorySize.parse("8gb").getBytes());
>  compactionOptionsFIFO.setAllowCompaction(true);
>  
> the rocksdb version is 
> 
>  io.github.myasuka
>  frocksdbjni
>  6.10.2-ververica-3.0
>  
>  
> I think the problem is caused by tableproperty is lost by snapshot. Can any 
> one suggest how i can skip this problem?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24597) RocksdbStateBackend getKeysAndNamespaces would return duplicate data when using MapState

2021-11-01 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17436635#comment-17436635
 ] 

Yun Tang commented on FLINK-24597:
--

[~mayuehappy] Already assigned to you, please go ahead.

> RocksdbStateBackend getKeysAndNamespaces would return duplicate data when 
> using MapState 
> -
>
> Key: FLINK-24597
> URL: https://issues.apache.org/jira/browse/FLINK-24597
> Project: Flink
>  Issue Type: Bug
>  Components: API / State Processor, Runtime / State Backends
>Affects Versions: 1.14.0, 1.12.4, 1.13.3
>Reporter: Yue Ma
>Assignee: Yue Ma
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2021-11-01-14-19-58-372.png
>
>
> For example, in RocksdbStateBackend , if we worked in VoidNamespace , and And 
> use the ValueState like below .
> {code:java}
> // insert record
> for (int i = 0; i < 3; ++i) {
> keyedStateBackend.setCurrentKey(i);
> testValueState.update(String.valueOf(i));
> }
> {code}
> Then we get all the keysAndNamespace according the method 
> RocksDBKeyedStateBackend#getKeysAndNamespaces().The result of the traversal is
>  <1,VoidNamespace>,<2,VoidNamespace>,<3,VoidNamespace> ,which is as expected.
> Thus,if we use MapState , and update the MapState with different user key, 
> the getKeysAndNamespaces would return duplicate data with same 
> keyAndNamespace.
> {code:java}
> // insert record
> for (int i = 0; i < 3; ++i) {
> keyedStateBackend.setCurrentKey(i);
> mapState.put("userKeyA_" + i, "userValue");
> mapState.put("userKeyB_" + i, "userValue");
> }
> {code}
> The result of the traversal is
>  
> <1,VoidNamespace>,<1,VoidNamespace>,<2,VoidNamespace>,<2,VoidNamespace>,<3,VoidNamespace>,<3,VoidNamespace>.
> By reading the code, I found that the main reason for this problem is in the 
> implementation of _RocksStateKeysAndNamespaceIterator_.
> In the _hasNext_ method, when a new keyAndNamespace is created, there is no 
> comparison with the previousKeyAndNamespace. So we can refer to 
> RocksStateKeysIterator to implement the same logic should solve this problem.
>   
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-24597) RocksdbStateBackend getKeysAndNamespaces would return duplicate data when using MapState

2021-11-01 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-24597:


Assignee: Yue Ma

> RocksdbStateBackend getKeysAndNamespaces would return duplicate data when 
> using MapState 
> -
>
> Key: FLINK-24597
> URL: https://issues.apache.org/jira/browse/FLINK-24597
> Project: Flink
>  Issue Type: Bug
>  Components: API / State Processor, Runtime / State Backends
>Affects Versions: 1.14.0, 1.12.4, 1.13.3
>Reporter: Yue Ma
>Assignee: Yue Ma
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2021-11-01-14-19-58-372.png
>
>
> For example, in RocksdbStateBackend , if we worked in VoidNamespace , and And 
> use the ValueState like below .
> {code:java}
> // insert record
> for (int i = 0; i < 3; ++i) {
> keyedStateBackend.setCurrentKey(i);
> testValueState.update(String.valueOf(i));
> }
> {code}
> Then we get all the keysAndNamespace according the method 
> RocksDBKeyedStateBackend#getKeysAndNamespaces().The result of the traversal is
>  <1,VoidNamespace>,<2,VoidNamespace>,<3,VoidNamespace> ,which is as expected.
> Thus,if we use MapState , and update the MapState with different user key, 
> the getKeysAndNamespaces would return duplicate data with same 
> keyAndNamespace.
> {code:java}
> // insert record
> for (int i = 0; i < 3; ++i) {
> keyedStateBackend.setCurrentKey(i);
> mapState.put("userKeyA_" + i, "userValue");
> mapState.put("userKeyB_" + i, "userValue");
> }
> {code}
> The result of the traversal is
>  
> <1,VoidNamespace>,<1,VoidNamespace>,<2,VoidNamespace>,<2,VoidNamespace>,<3,VoidNamespace>,<3,VoidNamespace>.
> By reading the code, I found that the main reason for this problem is in the 
> implementation of _RocksStateKeysAndNamespaceIterator_.
> In the _hasNext_ method, when a new keyAndNamespace is created, there is no 
> comparison with the previousKeyAndNamespace. So we can refer to 
> RocksStateKeysIterator to implement the same logic should solve this problem.
>   
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-21543) when using FIFO compaction, I found sst being deleted on the first checkpoint

2021-10-29 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17435847#comment-17435847
 ] 

Yun Tang commented on FLINK-21543:
--

Since RocksDB's FIFO compaction would cause data lost quitely, it cannot be 
used in open-source Flink, I'll close this ticket. 
[~zhoujira86] your special use case is intresting, and mabe you could share 
more information if still choosing this way.

> when using FIFO compaction, I found sst being deleted on the first checkpoint
> -
>
> Key: FLINK-21543
> URL: https://issues.apache.org/jira/browse/FLINK-21543
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: xiaogang zhou
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-minor
> Attachments: LOG (2), image-2021-03-03-11-35-11-458.png, 
> image-2021-03-03-13-09-01-695.png
>
>
> 2021/03/01-18:51:01.202049 7f59042fc700 (Original Log Time 
> 2021/03/01-18:51:01.200883) [/compaction/compaction_picker_fifo.cc:107] 
> [_timer_state/processing_user-timers] FIFO compaction: picking file 1710 with 
> creation time 0 for deletion
>  
> the configuration is like 
> currentOptions.setCompactionStyle(getCompactionStyle());
>  currentOptions.setLevel0FileNumCompactionTrigger(8);
>  // 
> currentOptions.setMaxTableFilesSizeFIFO(MemorySize.parse("2gb").getBytes());
>  CompactionOptionsFIFO compactionOptionsFIFO = new CompactionOptionsFIFO();
>  
> compactionOptionsFIFO.setMaxTableFilesSize(MemorySize.parse("8gb").getBytes());
>  compactionOptionsFIFO.setAllowCompaction(true);
>  
> the rocksdb version is 
> 
>  io.github.myasuka
>  frocksdbjni
>  6.10.2-ververica-3.0
>  
>  
> I think the problem is caused by tableproperty is lost by snapshot. Can any 
> one suggest how i can skip this problem?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-21543) when using FIFO compaction, I found sst being deleted on the first checkpoint

2021-10-29 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-21543.

Resolution: Information Provided

> when using FIFO compaction, I found sst being deleted on the first checkpoint
> -
>
> Key: FLINK-21543
> URL: https://issues.apache.org/jira/browse/FLINK-21543
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: xiaogang zhou
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-minor
> Attachments: LOG (2), image-2021-03-03-11-35-11-458.png, 
> image-2021-03-03-13-09-01-695.png
>
>
> 2021/03/01-18:51:01.202049 7f59042fc700 (Original Log Time 
> 2021/03/01-18:51:01.200883) [/compaction/compaction_picker_fifo.cc:107] 
> [_timer_state/processing_user-timers] FIFO compaction: picking file 1710 with 
> creation time 0 for deletion
>  
> the configuration is like 
> currentOptions.setCompactionStyle(getCompactionStyle());
>  currentOptions.setLevel0FileNumCompactionTrigger(8);
>  // 
> currentOptions.setMaxTableFilesSizeFIFO(MemorySize.parse("2gb").getBytes());
>  CompactionOptionsFIFO compactionOptionsFIFO = new CompactionOptionsFIFO();
>  
> compactionOptionsFIFO.setMaxTableFilesSize(MemorySize.parse("8gb").getBytes());
>  compactionOptionsFIFO.setAllowCompaction(true);
>  
> the rocksdb version is 
> 
>  io.github.myasuka
>  frocksdbjni
>  6.10.2-ververica-3.0
>  
>  
> I think the problem is caused by tableproperty is lost by snapshot. Can any 
> one suggest how i can skip this problem?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24597) RocksdbStateBackend getKeysAndNamespaces would return duplicate data when using MapState

2021-10-29 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17435842#comment-17435842
 ] 

Yun Tang commented on FLINK-24597:
--

I think this is valid, and current {{RocksStateKeysIterator}}, which used in 
{{#getKeys}}, would filter same primary key via {{!Objects.equals(previousKey, 
currentKey)}}, we at least need to do simlar things during 
{{#getKeysAndNamespaces}}. For performance things, since we could have unfixed 
length key-serializer, it might not be easy to avoid the deserialization.

> RocksdbStateBackend getKeysAndNamespaces would return duplicate data when 
> using MapState 
> -
>
> Key: FLINK-24597
> URL: https://issues.apache.org/jira/browse/FLINK-24597
> Project: Flink
>  Issue Type: Bug
>  Components: API / State Processor, Runtime / State Backends
>Affects Versions: 1.14.0, 1.12.4, 1.13.3
>Reporter: Yue Ma
>Priority: Major
>  Labels: pull-request-available
>
> For example, in RocksdbStateBackend , if we worked in VoidNamespace , and And 
> use the ValueState like below .
> {code:java}
> // insert record
> for (int i = 0; i < 3; ++i) {
> keyedStateBackend.setCurrentKey(i);
> testValueState.update(String.valueOf(i));
> }
> {code}
> Then we get all the keysAndNamespace according the method 
> RocksDBKeyedStateBackend#getKeysAndNamespaces().The result of the traversal is
>  <1,VoidNamespace>,<2,VoidNamespace>,<3,VoidNamespace> ,which is as expected.
> Thus,if we use MapState , and update the MapState with different user key, 
> the getKeysAndNamespaces would return duplicate data with same 
> keyAndNamespace.
> {code:java}
> // insert record
> for (int i = 0; i < 3; ++i) {
> keyedStateBackend.setCurrentKey(i);
> mapState.put("userKeyA_" + i, "userValue");
> mapState.put("userKeyB_" + i, "userValue");
> }
> {code}
> The result of the traversal is
>  
> <1,VoidNamespace>,<1,VoidNamespace>,<2,VoidNamespace>,<2,VoidNamespace>,<3,VoidNamespace>,<3,VoidNamespace>.
> By reading the code, I found that the main reason for this problem is in the 
> implementation of _RocksStateKeysAndNamespaceIterator_.
> In the _hasNext_ method, when a new keyAndNamespace is created, there is no 
> comparison with the previousKeyAndNamespace. So we can refer to 
> RocksStateKeysIterator to implement the same logic should solve this problem.
>   
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-24678) Correct the metric name of map state contains latency

2021-10-28 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-24678.
--
Resolution: Fixed

Merged
master: b1a409ef1c53a24d717c789eea8c09ddbc94f7f1
release-1.14: 314fd0a14dfc4bfaa122bc2d7c1bc4f253216f7a
release-1.13: 139bba8b93ae1870f11da0adff5bc4ab67bf73a9

> Correct the metric name of map state contains latency
> -
>
> Key: FLINK-24678
> URL: https://issues.apache.org/jira/browse/FLINK-24678
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics
>Affects Versions: 1.14.0, 1.13.3
>Reporter: Yun Tang
>Assignee: Jinzhong Li
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.1, 1.13.4
>
>
> Current metric name of map state contains is {{mapStateContainsAllLatency}} 
> which should be {{mapStateContainsLatency}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24678) Correct the metric name of map state contains latency

2021-10-28 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-24678:
-
Fix Version/s: 1.15.0

> Correct the metric name of map state contains latency
> -
>
> Key: FLINK-24678
> URL: https://issues.apache.org/jira/browse/FLINK-24678
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics
>Affects Versions: 1.14.0, 1.13.3
>Reporter: Yun Tang
>Assignee: Jinzhong Li
>Priority: Minor
> Fix For: 1.15.0, 1.14.1, 1.13.4
>
>
> Current metric name of map state contains is {{mapStateContainsAllLatency}} 
> which should be {{mapStateContainsLatency}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-24678) Correct the metric name of map state contains latency

2021-10-28 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-24678:


Assignee: Jinzhong Li  (was: Yun Tang)

> Correct the metric name of map state contains latency
> -
>
> Key: FLINK-24678
> URL: https://issues.apache.org/jira/browse/FLINK-24678
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics
>Affects Versions: 1.14.0, 1.13.3
>Reporter: Yun Tang
>Assignee: Jinzhong Li
>Priority: Minor
> Fix For: 1.14.1, 1.13.4
>
>
> Current metric name of map state contains is {{mapStateContainsAllLatency}} 
> which should be {{mapStateContainsLatency}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24678) Correct the metric name of map state contains latency

2021-10-28 Thread Yun Tang (Jira)
Yun Tang created FLINK-24678:


 Summary: Correct the metric name of map state contains latency
 Key: FLINK-24678
 URL: https://issues.apache.org/jira/browse/FLINK-24678
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Metrics
Affects Versions: 1.13.3, 1.14.0
Reporter: Yun Tang
Assignee: Yun Tang
 Fix For: 1.14.1, 1.13.4


Current metric name of map state contains is {{mapStateContainsAllLatency}} 
which should be {{mapStateContainsLatency}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24667) Channel state writer would fail the task directly if meeting exception previously

2021-10-27 Thread Yun Tang (Jira)
Yun Tang created FLINK-24667:


 Summary: Channel state writer would fail the task directly if 
meeting exception previously
 Key: FLINK-24667
 URL: https://issues.apache.org/jira/browse/FLINK-24667
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing, Runtime / Task
Affects Versions: 1.13.3, 1.14.0
Reporter: Yun Tang


Currently, if channel state writer come across exception when closing a file, 
such as meet exception during 
{{SubtaskCheckpointCoordinatorImpl#cancelAsyncCheckpointRunnable}}, it will 
exit the loop. However, in the following {{channelStateWriter#abort}} it will 
throw exception directly:


{code:java}
switched from RUNNING to FAILED with failure cause: java.io.IOException: 
java.lang.RuntimeException: unable to send request to worker
at 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.checkError(InputChannel.java:228)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.checkPartitionRequestQueueInitialized(RemoteInputChannel.java:735)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.getNextBuffer(RemoteInputChannel.java:204)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.waitAndGetNextData(SingleInputGate.java:651)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:626)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNext(SingleInputGate.java:612)
at 
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.pollNext(InputGateWithMetrics.java:109)
at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:149)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:98)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:424)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:685)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:640)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:651)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:624)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:798)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
{code}

This is not expected as checkpoint failure should not lead to task failover 
each time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-24546) Acknowledged description miss on Monitoring Checkpointing page

2021-10-20 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-24546.
--
Fix Version/s: 1.15.0
   Resolution: Fixed

merged in master: 6b405f6318b82d759fbd93f9a6af213cda72374d

> Acknowledged description miss on Monitoring Checkpointing page
> --
>
> Key: FLINK-24546
> URL: https://issues.apache.org/jira/browse/FLINK-24546
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: camilesing
>Assignee: camilesing
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.15.0
>
> Attachments: image-2021-10-14-19-26-33-289.png
>
>
> !image-2021-10-14-19-26-33-289.png!
> Acknowledged description miss on Monitoring Checkpointing page



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24546) Acknowledged description miss on Monitoring Checkpointing page

2021-10-20 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17430987#comment-17430987
 ] 

Yun Tang commented on FLINK-24546:
--

[~Alston Williams], thanks for your enthusiasm!
As I wrote in [previous 
comment](https://github.com/apache/flink/pull/17482#issuecomment-947302284), I 
think you could create issue to help improve documentation of subtask details 
in history tab which missed the description of Processed (persisted) data.

> Acknowledged description miss on Monitoring Checkpointing page
> --
>
> Key: FLINK-24546
> URL: https://issues.apache.org/jira/browse/FLINK-24546
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: camilesing
>Assignee: camilesing
>Priority: Minor
>  Labels: pull-request-available
> Attachments: image-2021-10-14-19-26-33-289.png
>
>
> !image-2021-10-14-19-26-33-289.png!
> Acknowledged description miss on Monitoring Checkpointing page



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-24546) Acknowledged description miss on Monitoring Checkpointing page

2021-10-20 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-24546:


Assignee: camilesing

> Acknowledged description miss on Monitoring Checkpointing page
> --
>
> Key: FLINK-24546
> URL: https://issues.apache.org/jira/browse/FLINK-24546
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: camilesing
>Assignee: camilesing
>Priority: Minor
>  Labels: pull-request-available
> Attachments: image-2021-10-14-19-26-33-289.png
>
>
> !image-2021-10-14-19-26-33-289.png!
> Acknowledged description miss on Monitoring Checkpointing page



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-24432) RocksIteratorWrapper.seekToLast() calls the wrong RocksIterator method

2021-10-14 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-24432.
--
Fix Version/s: 1.14.1
   1.15.0
   Resolution: Fixed

merged in master: ae531b5888667abd0b194e0372bdde03581de97c

merged in release-1.14: ba6a8cd72c0abe12e50d09a5f42d74d2bea27e42

> RocksIteratorWrapper.seekToLast() calls the wrong RocksIterator method
> --
>
> Key: FLINK-24432
> URL: https://issues.apache.org/jira/browse/FLINK-24432
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.14.0
>Reporter: Victor Xu
>Assignee: Victor Xu
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.1
>
>
> The RocksIteratorWrapper is a wrapper of RocksIterator to do additional 
> status check for all the methods. However, there's a typo that 
> RocksIteratorWrapper.*seekToLast*() method calls RocksIterator's 
> *seekToFirst*(), which is obviously wrong. I guess this issue wasn't found 
> before as it was only referenced in the 
> RocksTransformingIteratorWrapper.seekToLast() method and nowhere else.
> {code:java}
> @Override
> public void seekToFirst() {
>  iterator.seekToFirst();
>  status();
> }
> @Override
> public void seekToLast() {
>  iterator.seekToFirst();
>  status();
> }{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-24460) Rocksdb Iterator Error Handling Improvement

2021-10-13 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-24460.
--
Fix Version/s: 1.14.1
   1.15.0
   Resolution: Fixed

merged in master:
77b8e9c042d1d4d2779afc7edaf0d4ae9eaa

merged in release-1.14:
1f17b28f4ddb8b3896e1a43de63d83feccf03c65

> Rocksdb Iterator Error Handling Improvement
> ---
>
> Key: FLINK-24460
> URL: https://issues.apache.org/jira/browse/FLINK-24460
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.14.0
>Reporter: Victor Xu
>Assignee: Victor Xu
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.1
>
>
> In FLINK-9373, we introduced RocksIteratorWrapper which was a wrapper around 
> RocksIterator to check the iterator status for all the methods. At that time, 
> it was required because the iterator may pass the blocks or files it had 
> difficulties in reading (because of IO errors, data corruptions, or other 
> issues) and continue with the next available keys. *The status flag may not 
> be OK, even if the iterator is valid.*
> However, the above behaviour changed after 
> [3810|https://github.com/facebook/rocksdb/pull/3810] was merged on May 17, 
> 2018:
>  *- If the iterator is valid, the status() is guaranteed to be OK;*
>  *- If the iterator is not valid, there are two possibilities:*
>     *1) We have reached the end of the data. And in this case, status() is 
> OK;*
>     *2) There is an error. In this case, status() is not OK;*
> More information can be found here: 
> https://github.com/facebook/rocksdb/wiki/Iterator#error-handling
> Thus, it should be safe to proceed with other operations (e.g. seek, next, 
> seekToFirst, seekToLast, seekForPrev, and prev) without checking status(). 
> And we only need to check the status if the iterator is invalid. After the 
> change, there will be less status() native calls and could theoretically 
> improve performance.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24460) Rocksdb Iterator Error Handling Improvement

2021-10-10 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17426944#comment-17426944
 ] 

Yun Tang commented on FLINK-24460:
--

This fix should be useful, [~victorunique] already assigned this ticket to you.

> Rocksdb Iterator Error Handling Improvement
> ---
>
> Key: FLINK-24460
> URL: https://issues.apache.org/jira/browse/FLINK-24460
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.14.0
>Reporter: Victor Xu
>Assignee: Victor Xu
>Priority: Minor
>  Labels: pull-request-available
>
> In FLINK-9373, we introduced RocksIteratorWrapper which was a wrapper around 
> RocksIterator to check the iterator status for all the methods. At that time, 
> it was required because the iterator may pass the blocks or files it had 
> difficulties in reading (because of IO errors, data corruptions, or other 
> issues) and continue with the next available keys. *The status flag may not 
> be OK, even if the iterator is valid.*
> However, the above behaviour changed after 
> [3810|https://github.com/facebook/rocksdb/pull/3810] was merged on May 17, 
> 2018:
>  *- If the iterator is valid, the status() is guaranteed to be OK;*
>  *- If the iterator is not valid, there are two possibilities:*
>     *1) We have reached the end of the data. And in this case, status() is 
> OK;*
>     *2) There is an error. In this case, status() is not OK;*
> More information can be found here: 
> https://github.com/facebook/rocksdb/wiki/Iterator#error-handling
> Thus, it should be safe to proceed with other operations (e.g. seek, next, 
> seekToFirst, seekToLast, seekForPrev, and prev) without checking status(). 
> And we only need to check the status if the iterator is invalid. After the 
> change, there will be less status() native calls and could theoretically 
> improve performance.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-24460) Rocksdb Iterator Error Handling Improvement

2021-10-10 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-24460:


Assignee: Victor Xu

> Rocksdb Iterator Error Handling Improvement
> ---
>
> Key: FLINK-24460
> URL: https://issues.apache.org/jira/browse/FLINK-24460
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.14.0
>Reporter: Victor Xu
>Assignee: Victor Xu
>Priority: Minor
>  Labels: pull-request-available
>
> In FLINK-9373, we introduced RocksIteratorWrapper which was a wrapper around 
> RocksIterator to check the iterator status for all the methods. At that time, 
> it was required because the iterator may pass the blocks or files it had 
> difficulties in reading (because of IO errors, data corruptions, or other 
> issues) and continue with the next available keys. *The status flag may not 
> be OK, even if the iterator is valid.*
> However, the above behaviour changed after 
> [3810|https://github.com/facebook/rocksdb/pull/3810] was merged on May 17, 
> 2018:
>  *- If the iterator is valid, the status() is guaranteed to be OK;*
>  *- If the iterator is not valid, there are two possibilities:*
>     *1) We have reached the end of the data. And in this case, status() is 
> OK;*
>     *2) There is an error. In this case, status() is not OK;*
> More information can be found here: 
> https://github.com/facebook/rocksdb/wiki/Iterator#error-handling
> Thus, it should be safe to proceed with other operations (e.g. seek, next, 
> seekToFirst, seekToLast, seekForPrev, and prev) without checking status(). 
> And we only need to check the status if the iterator is invalid. After the 
> change, there will be less status() native calls and could theoretically 
> improve performance.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-24148) Add bloom filter policy option in RocksDBConfiguredOptions

2021-09-30 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-24148.
--
Resolution: Fixed

merged in release-1.14: 7882d5e5e7d6c6bd0c12f659b4c92829703a2b29

> Add bloom filter policy option in RocksDBConfiguredOptions
> --
>
> Key: FLINK-24148
> URL: https://issues.apache.org/jira/browse/FLINK-24148
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.13.2, 1.14.1
>Reporter: Jiayi Liao
>Assignee: Jiayi Liao
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.1
>
>
> Bloom filter can efficiently enhance the read on RocksDB, especially for the 
> reading among L0 files. (more details see 
> https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-23519) Aggregate State Backend Latency by State Level

2021-09-30 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-23519.
--
Resolution: Fixed

Merged in release-1.14: b5fab64432c9839f956c9c169f617edf1026669f

> Aggregate State Backend Latency by State Level
> --
>
> Key: FLINK-23519
> URL: https://issues.apache.org/jira/browse/FLINK-23519
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics, Runtime / State Backends
>Affects Versions: 1.13.0
>Reporter: Mason Chen
>Assignee: Yun Tang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.13.3, 1.15.0, 1.14.1
>
>
> To make metrics aggregation easier, there should be a config to expose 
> something like `state.backend.rocksdb.metrics.column-family-as-variable` that 
> rocksdb provides to do aggregation across column families 
> ([https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#state-backend-rocksdb-metrics-column-family-as-variable]).
>  
> In this case of state backend latency, the variable exposed would be state 
> level instead column family. This makes it easier to aggregate by the various 
> state levels that are reported.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23519) Aggregate State Backend Latency by State Level

2021-09-23 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17419576#comment-17419576
 ] 

Yun Tang commented on FLINK-23519:
--

merged in 
master( 1.15.0) : e2bda3df78b0a7d16acf81611d3768e12309a1b4
release-1.13: adfbbcbe03783ce3cbe9fff4aaa2bca9ab7f2be7

> Aggregate State Backend Latency by State Level
> --
>
> Key: FLINK-23519
> URL: https://issues.apache.org/jira/browse/FLINK-23519
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics, Runtime / State Backends
>Affects Versions: 1.13.0
>Reporter: Mason Chen
>Assignee: Yun Tang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.13.3, 1.15.0, 1.14.1
>
>
> To make metrics aggregation easier, there should be a config to expose 
> something like `state.backend.rocksdb.metrics.column-family-as-variable` that 
> rocksdb provides to do aggregation across column families 
> ([https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#state-backend-rocksdb-metrics-column-family-as-variable]).
>  
> In this case of state backend latency, the variable exposed would be state 
> level instead column family. This makes it easier to aggregate by the various 
> state levels that are reported.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23949) first incremental checkpoint after a savepoint will degenerate into a full checkpoint

2021-09-18 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17417115#comment-17417115
 ] 

Yun Tang commented on FLINK-23949:
--

merged in release-1.12: b84223034a55a16df4d977f5be1ff9e6e3d4291a

> first incremental checkpoint after a savepoint will degenerate into a full 
> checkpoint
> -
>
> Key: FLINK-23949
> URL: https://issues.apache.org/jira/browse/FLINK-23949
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.11.4, 1.12.5, 1.13.2
>Reporter: Feifan Wang
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0, 1.13.3, 1.15.0
>
> Attachments: image-2021-08-25-00-59-05-779.png
>
>
> In RocksIncrementalSnapshotStrategy we will record the uploaded rocksdb files 
> corresponding to the checkpoint id,and clean it in 
> _CheckpointListener#notifyCheckpointComplete ._
> {code:java}
> @Override
> public void notifyCheckpointComplete(long completedCheckpointId) {
> synchronized (materializedSstFiles) {
> if (completedCheckpointId > lastCompletedCheckpointId) {
> materializedSstFiles
> .keySet()
> .removeIf(checkpointId -> checkpointId < 
> completedCheckpointId);
> lastCompletedCheckpointId = completedCheckpointId;
> }
> }
> }{code}
>  
> This works well without savepoint, but when a savepoint is completed, it will 
> clean up the _materializedSstFiles_ of the previous checkpoint. It leads to 
> the first checkpoint after the savepoint must upload all files in rocksdb.
> !image-2021-08-25-00-59-05-779.png|width=1188,height=163!
> Solving the problem is also very simple, I propose to clean 
> _materializedSstFiles_ and update  _lastCompletedCheckpointId_ only when 
> {color:#ff}_materializedSstFiles.keySet().contains(completedCheckpointId)_{color}
>  .
> If a _completedCheckpointId_ is not in _materializedSstFiles.keySet()_ , 
> there are only two cases:
> 1. It is a checkpoint but there is a checkpoint with larger id number 
> completed before it
> 2. It is a savepoint (savepoint not produce by 
> RocksIncrementalSnapshotStrategy)
> In either case we don’t need clean _materializedSstFiles_ and update  
> _lastCompletedCheckpointId_  anymore.
> [~yunta] , [~trohrmann] , I have submitted a pull request to solve this 
> problem, please evaluate whether it is appropriate.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23949) first incremental checkpoint after a savepoint will degenerate into a full checkpoint

2021-09-17 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-23949:
-
Component/s: (was: Runtime / State Backends)
 Runtime / Checkpointing

> first incremental checkpoint after a savepoint will degenerate into a full 
> checkpoint
> -
>
> Key: FLINK-23949
> URL: https://issues.apache.org/jira/browse/FLINK-23949
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.4, 1.12.5, 1.13.2
>Reporter: Feifan Wang
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0, 1.13.3, 1.15.0
>
> Attachments: image-2021-08-25-00-59-05-779.png
>
>
> In RocksIncrementalSnapshotStrategy we will record the uploaded rocksdb files 
> corresponding to the checkpoint id,and clean it in 
> _CheckpointListener#notifyCheckpointComplete ._
> {code:java}
> @Override
> public void notifyCheckpointComplete(long completedCheckpointId) {
> synchronized (materializedSstFiles) {
> if (completedCheckpointId > lastCompletedCheckpointId) {
> materializedSstFiles
> .keySet()
> .removeIf(checkpointId -> checkpointId < 
> completedCheckpointId);
> lastCompletedCheckpointId = completedCheckpointId;
> }
> }
> }{code}
>  
> This works well without savepoint, but when a savepoint is completed, it will 
> clean up the _materializedSstFiles_ of the previous checkpoint. It leads to 
> the first checkpoint after the savepoint must upload all files in rocksdb.
> !image-2021-08-25-00-59-05-779.png|width=1188,height=163!
> Solving the problem is also very simple, I propose to clean 
> _materializedSstFiles_ and update  _lastCompletedCheckpointId_ only when 
> {color:#ff}_materializedSstFiles.keySet().contains(completedCheckpointId)_{color}
>  .
> If a _completedCheckpointId_ is not in _materializedSstFiles.keySet()_ , 
> there are only two cases:
> 1. It is a checkpoint but there is a checkpoint with larger id number 
> completed before it
> 2. It is a savepoint (savepoint not produce by 
> RocksIncrementalSnapshotStrategy)
> In either case we don’t need clean _materializedSstFiles_ and update  
> _lastCompletedCheckpointId_  anymore.
> [~yunta] , [~trohrmann] , I have submitted a pull request to solve this 
> problem, please evaluate whether it is appropriate.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23949) first incremental checkpoint after a savepoint will degenerate into a full checkpoint

2021-09-17 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-23949:
-
Component/s: Runtime / State Backends

> first incremental checkpoint after a savepoint will degenerate into a full 
> checkpoint
> -
>
> Key: FLINK-23949
> URL: https://issues.apache.org/jira/browse/FLINK-23949
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.11.4, 1.12.5, 1.13.2
>Reporter: Feifan Wang
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0, 1.13.3, 1.15.0
>
> Attachments: image-2021-08-25-00-59-05-779.png
>
>
> In RocksIncrementalSnapshotStrategy we will record the uploaded rocksdb files 
> corresponding to the checkpoint id,and clean it in 
> _CheckpointListener#notifyCheckpointComplete ._
> {code:java}
> @Override
> public void notifyCheckpointComplete(long completedCheckpointId) {
> synchronized (materializedSstFiles) {
> if (completedCheckpointId > lastCompletedCheckpointId) {
> materializedSstFiles
> .keySet()
> .removeIf(checkpointId -> checkpointId < 
> completedCheckpointId);
> lastCompletedCheckpointId = completedCheckpointId;
> }
> }
> }{code}
>  
> This works well without savepoint, but when a savepoint is completed, it will 
> clean up the _materializedSstFiles_ of the previous checkpoint. It leads to 
> the first checkpoint after the savepoint must upload all files in rocksdb.
> !image-2021-08-25-00-59-05-779.png|width=1188,height=163!
> Solving the problem is also very simple, I propose to clean 
> _materializedSstFiles_ and update  _lastCompletedCheckpointId_ only when 
> {color:#ff}_materializedSstFiles.keySet().contains(completedCheckpointId)_{color}
>  .
> If a _completedCheckpointId_ is not in _materializedSstFiles.keySet()_ , 
> there are only two cases:
> 1. It is a checkpoint but there is a checkpoint with larger id number 
> completed before it
> 2. It is a savepoint (savepoint not produce by 
> RocksIncrementalSnapshotStrategy)
> In either case we don’t need clean _materializedSstFiles_ and update  
> _lastCompletedCheckpointId_  anymore.
> [~yunta] , [~trohrmann] , I have submitted a pull request to solve this 
> problem, please evaluate whether it is appropriate.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23180) Initialize checkpoint location lazily in DataStream Batch Jobs

2021-09-17 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-23180:
-
Release Note:   (was: merged in master:
164a59ac1bb4dd39e6532478c30234eeafd76cd0)

merged in master:
164a59ac1bb4dd39e6532478c30234eeafd76cd0

> Initialize checkpoint location lazily in DataStream Batch Jobs
> --
>
> Key: FLINK-23180
> URL: https://issues.apache.org/jira/browse/FLINK-23180
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.0
>Reporter: Jiayi Liao
>Assignee: Jiayi Liao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Currently batch jobs will initialize checkpoint location eagerly when 
> {{CheckpointCoordinator}} is created, which will create lots of useless 
> directories on distributed filesystem. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24240) HA JobGraph deserialization problem when migrate 1.12.4 to 1.13.2

2021-09-17 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-24240:
-
Component/s: (was: Runtime / State Backends)
 Runtime / Coordination

> HA JobGraph deserialization problem when migrate 1.12.4 to 1.13.2
> -
>
> Key: FLINK-24240
> URL: https://issues.apache.org/jira/browse/FLINK-24240
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.13.2
>Reporter: Zheren Yu
>Priority: Major
>
> We are using HA with flink on k8s, which will create the configmap like 
> `xxx-dispatcher-leader`, and put jobGraph inside it, once we update version 
> from 1.12.4 to 1.13.2 without stopping the job, the jobGraph create from old 
> version will be deserialized and lacking of the filed of jobType, which cause 
> the below problem
> {code:java}
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory$PartitionLocationConstraint.fromJobType(TaskDeploymentDescriptorFactory.java:282)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:347)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:190)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:122)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:317) 
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>  ~[?:1.8.0_302]
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> ~[?:1.8.0_302]
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> ~[?:1.8.0_302]
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  ~[?:1.8.0_302]
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  ~[?:1.8.0_302]
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  ~[?:1.8.0_302]
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  ~[?:1.8.0_302]
>   at java.lang.Thread.run(Thread.java:748)
> {code}
> I just wandering do we have any workaround with this?
> (although I know manually stopping the job may work)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23180) Initialize checkpoint location lazily in DataStream Batch Jobs

2021-09-16 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-23180:
-
Fix Version/s: 1.15.0

> Initialize checkpoint location lazily in DataStream Batch Jobs
> --
>
> Key: FLINK-23180
> URL: https://issues.apache.org/jira/browse/FLINK-23180
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.0
>Reporter: Jiayi Liao
>Assignee: Jiayi Liao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Currently batch jobs will initialize checkpoint location eagerly when 
> {{CheckpointCoordinator}} is created, which will create lots of useless 
> directories on distributed filesystem. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-23180) Initialize checkpoint location lazily in DataStream Batch Jobs

2021-09-16 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-23180.
--
Release Note: 
merged in master:
164a59ac1bb4dd39e6532478c30234eeafd76cd0
  Resolution: Fixed

> Initialize checkpoint location lazily in DataStream Batch Jobs
> --
>
> Key: FLINK-23180
> URL: https://issues.apache.org/jira/browse/FLINK-23180
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.0
>Reporter: Jiayi Liao
>Assignee: Jiayi Liao
>Priority: Major
>  Labels: pull-request-available
>
> Currently batch jobs will initialize checkpoint location eagerly when 
> {{CheckpointCoordinator}} is created, which will create lots of useless 
> directories on distributed filesystem. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24149) Make checkpoint self-contained and relocatable

2021-09-13 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17414676#comment-17414676
 ] 

Yun Tang commented on FLINK-24149:
--

[~Feifan Wang] My core idea is that we should avoid any misuse from customers 
and could make this whole phase easy to use. From my view, using 
state-processor-api to rewrite is not so convenient.

We can have several ways:
*  Use a tool to modify the checkpoint meta data and call distributed file 
system to copy remote files to another namespace.
* The checkpoint directories could be copied directly to another namespace 
without any additional changes (just like what you provided in this ticket, but 
current solution cannot satify case that containing multi previous incremental 
checkpoint directories)


> Make checkpoint self-contained and relocatable
> --
>
> Key: FLINK-24149
> URL: https://issues.apache.org/jira/browse/FLINK-24149
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2021-09-08-17-06-31-560.png, 
> image-2021-09-08-17-10-28-240.png, image-2021-09-08-17-55-46-898.png, 
> image-2021-09-08-18-01-03-176.png
>
>
> h1. Backgroud
> We have many jobs with large state size in production environment. According 
> to the operation practice of these jobs and the analysis of some specific 
> problems, we believe that RocksDBStateBackend's incremental checkpoint has 
> many advantages over savepoint:
>  # Savepoint takes much longer time then incremental checkpoint in jobs with 
> large state. The figure below is a job in our production environment, it 
> takes nearly 7 minutes to complete a savepoint, while checkpoint only takes a 
> few seconds.( checkpoint after savepoint takes longer time is a problem 
> described in -FLINK-23949-)
>  !image-2021-09-08-17-55-46-898.png|width=723,height=161!
>  # Savepoint causes excessive cpu usage. The figure below shows the CPU usage 
> of the same job in the above figure :
>  !image-2021-09-08-18-01-03-176.png|width=516,height=148!
>  # Savepoint may cause excessive native memory usage and eventually cause the 
> TaskManager process memory usage to exceed the limit. (We did not further 
> investigate the cause and did not try to reproduce the problem on other large 
> state jobs, but only increased the overhead memory. So this reason may not be 
> so conclusive. )
> For the above reasons, we tend to use retained incremental checkpoint to 
> completely replace savepoint for jobs with large state size.
> h1. Problems
>  * *Problem 1 : retained incremental checkpoint difficult to clean up once 
> they used for recovery*
> This problem caused by jobs recoveryed from a retained incremental checkpoint 
> may reference files on this retained incremental checkpoint's shared 
> directory in subsequent checkpoints, even they are not in a same job 
> instance. The worst case is that the retained checkpoint will be referenced 
> one by one, forming a very long reference chain.This makes it difficult for 
> users to manage retained checkpoints. In fact, we have also suffered failures 
> caused by incorrect deletion of retained checkpoints.
> Although we can use the file handle in checkpoint metadata to figure out 
> which files can be deleted, but I think it is inappropriate to let users do 
> this.
>  * *Problem 2 : checkpoint not relocatable*
> Even if we can figure out all files referenced by a checkpoint, moving these 
> files will invalidate the checkpoint as well, because the metadata file 
> references absolute file paths.
> Since savepoint already be self-contained and relocatable (FLINK-5763​), why 
> don't we use savepoint just for migrate jobs to another place ? In addition 
> to the savepoint performance problem in the background description, a very 
> important reason is that the migration requirement may come from the failure 
> of the original cluster. In this case, there is no opportunity to trigger 
> savepoint.
> h1. Proposal
>  * *job's checkpoint directory (user-defined-checkpoint-dir/) contains 
> all their state files (self-contained)*
>  As far as I know, in the current status, only the subsequent checkpoints of 
> the jobs restored from the retained checkpoint violate this constraint. One 
> possible solution is to re-upload all shared files at the first incremental 
> checkpoint after the job started, but we need to discuss how to distinguish 
> between a new job instance and a restart.
>  * *use relative file path in checkpoint metadata (relocatable)*
> Change all file references in checkpoint metadata to the relative path 
> relative to the _metadata file, so we can copy 
> user-defined-checkpoint-dir/ to any other place.
>  
> BTW, this issue is so 

[jira] [Updated] (FLINK-14482) Bump up rocksdb version

2021-09-10 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-14482:
-
Attachment: image.png

> Bump up rocksdb version
> ---
>
> Key: FLINK-14482
> URL: https://issues.apache.org/jira/browse/FLINK-14482
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.14.0
>
> Attachments: 
> Screenshot-for-perf-regression-after-FRocksDB-upgrade-1.png, 
> Screenshot-for-perf-regression-after-FRocksDB-upgrade-2.png, image.png
>
>
> This JIRA aims at rebasing frocksdb to [newer 
> version|https://github.com/facebook/rocksdb/releases] of official RocksDB.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-14482) Bump up rocksdb version

2021-09-10 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413132#comment-17413132
 ] 

Yun Tang edited comment on FLINK-14482 at 9/10/21, 11:47 AM:
-

According to the [nexmark|https://github.com/nexmark/nexmark], state-related 
queries could have at most 8% performance regression:

!image.png!


was (Author: yunta):
According to the nexmark, state-related queries could have at most 8% 
performance regression:

!image.png!

> Bump up rocksdb version
> ---
>
> Key: FLINK-14482
> URL: https://issues.apache.org/jira/browse/FLINK-14482
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.14.0
>
> Attachments: 
> Screenshot-for-perf-regression-after-FRocksDB-upgrade-1.png, 
> Screenshot-for-perf-regression-after-FRocksDB-upgrade-2.png, image.png
>
>
> This JIRA aims at rebasing frocksdb to [newer 
> version|https://github.com/facebook/rocksdb/releases] of official RocksDB.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14482) Bump up rocksdb version

2021-09-10 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413132#comment-17413132
 ] 

Yun Tang commented on FLINK-14482:
--

According to the nexmark, state-related queries could have at most 8% 
performance regression:

!image.png!

> Bump up rocksdb version
> ---
>
> Key: FLINK-14482
> URL: https://issues.apache.org/jira/browse/FLINK-14482
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.14.0
>
> Attachments: 
> Screenshot-for-perf-regression-after-FRocksDB-upgrade-1.png, 
> Screenshot-for-perf-regression-after-FRocksDB-upgrade-2.png, image.png
>
>
> This JIRA aims at rebasing frocksdb to [newer 
> version|https://github.com/facebook/rocksdb/releases] of official RocksDB.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-24210) Window related serailizer should not return 0 as its serialized length

2021-09-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-24210:


Assignee: Jinzhong Li

> Window related serailizer should not return 0 as its serialized length
> --
>
> Key: FLINK-24210
> URL: https://issues.apache.org/jira/browse/FLINK-24210
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System, Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Jinzhong Li
>Priority: Minor
>
> TimeWindow serializer return 0 as its length for serialization, this is 
> certatinately not correct.
> {code:java}
> public static class Serializer extends TypeSerializerSingleton {
> 
> @Override
> public int getLength() {
> return 0;
> }
> }
> {code}
> Current namespace serializer in state backend does not depends on this 
> interface so that no obvious bug has ever reported.
> Moreover, this bug also occurs in other window related serializer.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24210) Window related serailizer should not return 0 as its serialized length

2021-09-08 Thread Yun Tang (Jira)
Yun Tang created FLINK-24210:


 Summary: Window related serailizer should not return 0 as its 
serialized length
 Key: FLINK-24210
 URL: https://issues.apache.org/jira/browse/FLINK-24210
 Project: Flink
  Issue Type: Bug
  Components: API / Type Serialization System, Runtime / State Backends
Reporter: Yun Tang


TimeWindow serializer return 0 as its length for serialization, this is 
certatinately not correct.
{code:java}
public static class Serializer extends TypeSerializerSingleton {

@Override
public int getLength() {
return 0;
}
}
{code}
Current namespace serializer in state backend does not depends on this 
interface so that no obvious bug has ever reported.

Moreover, this bug also occurs in other window related serializer.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23949) first incremental checkpoint after a savepoint will degenerate into a full checkpoint

2021-09-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-23949:
-
Fix Version/s: (was: 1.12.6)

> first incremental checkpoint after a savepoint will degenerate into a full 
> checkpoint
> -
>
> Key: FLINK-23949
> URL: https://issues.apache.org/jira/browse/FLINK-23949
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.11.4, 1.12.5, 1.13.2
>Reporter: Feifan Wang
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0, 1.13.3, 1.15.0
>
> Attachments: image-2021-08-25-00-59-05-779.png
>
>
> In RocksIncrementalSnapshotStrategy we will record the uploaded rocksdb files 
> corresponding to the checkpoint id,and clean it in 
> _CheckpointListener#notifyCheckpointComplete ._
> {code:java}
> @Override
> public void notifyCheckpointComplete(long completedCheckpointId) {
> synchronized (materializedSstFiles) {
> if (completedCheckpointId > lastCompletedCheckpointId) {
> materializedSstFiles
> .keySet()
> .removeIf(checkpointId -> checkpointId < 
> completedCheckpointId);
> lastCompletedCheckpointId = completedCheckpointId;
> }
> }
> }{code}
>  
> This works well without savepoint, but when a savepoint is completed, it will 
> clean up the _materializedSstFiles_ of the previous checkpoint. It leads to 
> the first checkpoint after the savepoint must upload all files in rocksdb.
> !image-2021-08-25-00-59-05-779.png|width=1188,height=163!
> Solving the problem is also very simple, I propose to clean 
> _materializedSstFiles_ and update  _lastCompletedCheckpointId_ only when 
> {color:#ff}_materializedSstFiles.keySet().contains(completedCheckpointId)_{color}
>  .
> If a _completedCheckpointId_ is not in _materializedSstFiles.keySet()_ , 
> there are only two cases:
> 1. It is a checkpoint but there is a checkpoint with larger id number 
> completed before it
> 2. It is a savepoint (savepoint not produce by 
> RocksIncrementalSnapshotStrategy)
> In either case we don’t need clean _materializedSstFiles_ and update  
> _lastCompletedCheckpointId_  anymore.
> [~yunta] , [~trohrmann] , I have submitted a pull request to solve this 
> problem, please evaluate whether it is appropriate.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-23949) first incremental checkpoint after a savepoint will degenerate into a full checkpoint

2021-09-08 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17411740#comment-17411740
 ] 

Yun Tang edited comment on FLINK-23949 at 9/8/21, 8:24 AM:
---

merged
 master: d2007b74d543edb05af203f115459c51378867d3
 release-1.14: 602a177f639154e50f1db3b663ca5f282b6aa49b
 release-1.13: b0e532621745d32b218596beceb2e385e766a9a6
 -release-1.12: 5f12f4ce88a4e83473e778eb30742ab88d92bdb0-


was (Author: yunta):
merged
master: d2007b74d543edb05af203f115459c51378867d3
release-1.14: 602a177f639154e50f1db3b663ca5f282b6aa49b
release-1.13: b0e532621745d32b218596beceb2e385e766a9a6
release-1.12: 5f12f4ce88a4e83473e778eb30742ab88d92bdb0

> first incremental checkpoint after a savepoint will degenerate into a full 
> checkpoint
> -
>
> Key: FLINK-23949
> URL: https://issues.apache.org/jira/browse/FLINK-23949
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.11.4, 1.12.5, 1.13.2
>Reporter: Feifan Wang
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0, 1.12.6, 1.13.3, 1.15.0
>
> Attachments: image-2021-08-25-00-59-05-779.png
>
>
> In RocksIncrementalSnapshotStrategy we will record the uploaded rocksdb files 
> corresponding to the checkpoint id,and clean it in 
> _CheckpointListener#notifyCheckpointComplete ._
> {code:java}
> @Override
> public void notifyCheckpointComplete(long completedCheckpointId) {
> synchronized (materializedSstFiles) {
> if (completedCheckpointId > lastCompletedCheckpointId) {
> materializedSstFiles
> .keySet()
> .removeIf(checkpointId -> checkpointId < 
> completedCheckpointId);
> lastCompletedCheckpointId = completedCheckpointId;
> }
> }
> }{code}
>  
> This works well without savepoint, but when a savepoint is completed, it will 
> clean up the _materializedSstFiles_ of the previous checkpoint. It leads to 
> the first checkpoint after the savepoint must upload all files in rocksdb.
> !image-2021-08-25-00-59-05-779.png|width=1188,height=163!
> Solving the problem is also very simple, I propose to clean 
> _materializedSstFiles_ and update  _lastCompletedCheckpointId_ only when 
> {color:#ff}_materializedSstFiles.keySet().contains(completedCheckpointId)_{color}
>  .
> If a _completedCheckpointId_ is not in _materializedSstFiles.keySet()_ , 
> there are only two cases:
> 1. It is a checkpoint but there is a checkpoint with larger id number 
> completed before it
> 2. It is a savepoint (savepoint not produce by 
> RocksIncrementalSnapshotStrategy)
> In either case we don’t need clean _materializedSstFiles_ and update  
> _lastCompletedCheckpointId_  anymore.
> [~yunta] , [~trohrmann] , I have submitted a pull request to solve this 
> problem, please evaluate whether it is appropriate.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23949) first incremental checkpoint after a savepoint will degenerate into a full checkpoint

2021-09-08 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17411773#comment-17411773
 ] 

Yun Tang commented on FLINK-23949:
--

[~dwysakowicz] Since interfaces of snapshot strategy has been changed since 
Flink-1.13, and the added UT cannot run on Flink-1.12, I had to revert this fix 
on Flink-1.12 now.

> first incremental checkpoint after a savepoint will degenerate into a full 
> checkpoint
> -
>
> Key: FLINK-23949
> URL: https://issues.apache.org/jira/browse/FLINK-23949
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.11.4, 1.12.5, 1.13.2
>Reporter: Feifan Wang
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0, 1.12.6, 1.13.3, 1.15.0
>
> Attachments: image-2021-08-25-00-59-05-779.png
>
>
> In RocksIncrementalSnapshotStrategy we will record the uploaded rocksdb files 
> corresponding to the checkpoint id,and clean it in 
> _CheckpointListener#notifyCheckpointComplete ._
> {code:java}
> @Override
> public void notifyCheckpointComplete(long completedCheckpointId) {
> synchronized (materializedSstFiles) {
> if (completedCheckpointId > lastCompletedCheckpointId) {
> materializedSstFiles
> .keySet()
> .removeIf(checkpointId -> checkpointId < 
> completedCheckpointId);
> lastCompletedCheckpointId = completedCheckpointId;
> }
> }
> }{code}
>  
> This works well without savepoint, but when a savepoint is completed, it will 
> clean up the _materializedSstFiles_ of the previous checkpoint. It leads to 
> the first checkpoint after the savepoint must upload all files in rocksdb.
> !image-2021-08-25-00-59-05-779.png|width=1188,height=163!
> Solving the problem is also very simple, I propose to clean 
> _materializedSstFiles_ and update  _lastCompletedCheckpointId_ only when 
> {color:#ff}_materializedSstFiles.keySet().contains(completedCheckpointId)_{color}
>  .
> If a _completedCheckpointId_ is not in _materializedSstFiles.keySet()_ , 
> there are only two cases:
> 1. It is a checkpoint but there is a checkpoint with larger id number 
> completed before it
> 2. It is a savepoint (savepoint not produce by 
> RocksIncrementalSnapshotStrategy)
> In either case we don’t need clean _materializedSstFiles_ and update  
> _lastCompletedCheckpointId_  anymore.
> [~yunta] , [~trohrmann] , I have submitted a pull request to solve this 
> problem, please evaluate whether it is appropriate.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23949) first incremental checkpoint after a savepoint will degenerate into a full checkpoint

2021-09-08 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17411765#comment-17411765
 ] 

Yun Tang commented on FLINK-23949:
--

[~dwysakowicz] Thanks for your reminder, I am taking a look now.

> first incremental checkpoint after a savepoint will degenerate into a full 
> checkpoint
> -
>
> Key: FLINK-23949
> URL: https://issues.apache.org/jira/browse/FLINK-23949
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.11.4, 1.12.5, 1.13.2
>Reporter: Feifan Wang
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0, 1.12.6, 1.13.3, 1.15.0
>
> Attachments: image-2021-08-25-00-59-05-779.png
>
>
> In RocksIncrementalSnapshotStrategy we will record the uploaded rocksdb files 
> corresponding to the checkpoint id,and clean it in 
> _CheckpointListener#notifyCheckpointComplete ._
> {code:java}
> @Override
> public void notifyCheckpointComplete(long completedCheckpointId) {
> synchronized (materializedSstFiles) {
> if (completedCheckpointId > lastCompletedCheckpointId) {
> materializedSstFiles
> .keySet()
> .removeIf(checkpointId -> checkpointId < 
> completedCheckpointId);
> lastCompletedCheckpointId = completedCheckpointId;
> }
> }
> }{code}
>  
> This works well without savepoint, but when a savepoint is completed, it will 
> clean up the _materializedSstFiles_ of the previous checkpoint. It leads to 
> the first checkpoint after the savepoint must upload all files in rocksdb.
> !image-2021-08-25-00-59-05-779.png|width=1188,height=163!
> Solving the problem is also very simple, I propose to clean 
> _materializedSstFiles_ and update  _lastCompletedCheckpointId_ only when 
> {color:#ff}_materializedSstFiles.keySet().contains(completedCheckpointId)_{color}
>  .
> If a _completedCheckpointId_ is not in _materializedSstFiles.keySet()_ , 
> there are only two cases:
> 1. It is a checkpoint but there is a checkpoint with larger id number 
> completed before it
> 2. It is a savepoint (savepoint not produce by 
> RocksIncrementalSnapshotStrategy)
> In either case we don’t need clean _materializedSstFiles_ and update  
> _lastCompletedCheckpointId_  anymore.
> [~yunta] , [~trohrmann] , I have submitted a pull request to solve this 
> problem, please evaluate whether it is appropriate.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-23949) first incremental checkpoint after a savepoint will degenerate into a full checkpoint

2021-09-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-23949.
--
Resolution: Fixed

merged
master: d2007b74d543edb05af203f115459c51378867d3
release-1.14: 602a177f639154e50f1db3b663ca5f282b6aa49b
release-1.13: b0e532621745d32b218596beceb2e385e766a9a6
release-1.12: 5f12f4ce88a4e83473e778eb30742ab88d92bdb0

> first incremental checkpoint after a savepoint will degenerate into a full 
> checkpoint
> -
>
> Key: FLINK-23949
> URL: https://issues.apache.org/jira/browse/FLINK-23949
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.11.4, 1.12.5, 1.13.2
>Reporter: Feifan Wang
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0, 1.12.6, 1.13.3, 1.15.0
>
> Attachments: image-2021-08-25-00-59-05-779.png
>
>
> In RocksIncrementalSnapshotStrategy we will record the uploaded rocksdb files 
> corresponding to the checkpoint id,and clean it in 
> _CheckpointListener#notifyCheckpointComplete ._
> {code:java}
> @Override
> public void notifyCheckpointComplete(long completedCheckpointId) {
> synchronized (materializedSstFiles) {
> if (completedCheckpointId > lastCompletedCheckpointId) {
> materializedSstFiles
> .keySet()
> .removeIf(checkpointId -> checkpointId < 
> completedCheckpointId);
> lastCompletedCheckpointId = completedCheckpointId;
> }
> }
> }{code}
>  
> This works well without savepoint, but when a savepoint is completed, it will 
> clean up the _materializedSstFiles_ of the previous checkpoint. It leads to 
> the first checkpoint after the savepoint must upload all files in rocksdb.
> !image-2021-08-25-00-59-05-779.png|width=1188,height=163!
> Solving the problem is also very simple, I propose to clean 
> _materializedSstFiles_ and update  _lastCompletedCheckpointId_ only when 
> {color:#ff}_materializedSstFiles.keySet().contains(completedCheckpointId)_{color}
>  .
> If a _completedCheckpointId_ is not in _materializedSstFiles.keySet()_ , 
> there are only two cases:
> 1. It is a checkpoint but there is a checkpoint with larger id number 
> completed before it
> 2. It is a savepoint (savepoint not produce by 
> RocksIncrementalSnapshotStrategy)
> In either case we don’t need clean _materializedSstFiles_ and update  
> _lastCompletedCheckpointId_  anymore.
> [~yunta] , [~trohrmann] , I have submitted a pull request to solve this 
> problem, please evaluate whether it is appropriate.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23949) first incremental checkpoint after a savepoint will degenerate into a full checkpoint

2021-09-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-23949:
-
Fix Version/s: 1.15.0
   1.13.3
   1.12.6
   1.14.0

> first incremental checkpoint after a savepoint will degenerate into a full 
> checkpoint
> -
>
> Key: FLINK-23949
> URL: https://issues.apache.org/jira/browse/FLINK-23949
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.11.4, 1.12.5, 1.13.2
>Reporter: Feifan Wang
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0, 1.12.6, 1.13.3, 1.15.0
>
> Attachments: image-2021-08-25-00-59-05-779.png
>
>
> In RocksIncrementalSnapshotStrategy we will record the uploaded rocksdb files 
> corresponding to the checkpoint id,and clean it in 
> _CheckpointListener#notifyCheckpointComplete ._
> {code:java}
> @Override
> public void notifyCheckpointComplete(long completedCheckpointId) {
> synchronized (materializedSstFiles) {
> if (completedCheckpointId > lastCompletedCheckpointId) {
> materializedSstFiles
> .keySet()
> .removeIf(checkpointId -> checkpointId < 
> completedCheckpointId);
> lastCompletedCheckpointId = completedCheckpointId;
> }
> }
> }{code}
>  
> This works well without savepoint, but when a savepoint is completed, it will 
> clean up the _materializedSstFiles_ of the previous checkpoint. It leads to 
> the first checkpoint after the savepoint must upload all files in rocksdb.
> !image-2021-08-25-00-59-05-779.png|width=1188,height=163!
> Solving the problem is also very simple, I propose to clean 
> _materializedSstFiles_ and update  _lastCompletedCheckpointId_ only when 
> {color:#ff}_materializedSstFiles.keySet().contains(completedCheckpointId)_{color}
>  .
> If a _completedCheckpointId_ is not in _materializedSstFiles.keySet()_ , 
> there are only two cases:
> 1. It is a checkpoint but there is a checkpoint with larger id number 
> completed before it
> 2. It is a savepoint (savepoint not produce by 
> RocksIncrementalSnapshotStrategy)
> In either case we don’t need clean _materializedSstFiles_ and update  
> _lastCompletedCheckpointId_  anymore.
> [~yunta] , [~trohrmann] , I have submitted a pull request to solve this 
> problem, please evaluate whether it is appropriate.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24148) Add bloom filter policy option in RocksDBConfiguredOptions

2021-09-07 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17411661#comment-17411661
 ] 

Yun Tang commented on FLINK-24148:
--

merged in master:
371f247a45adac24f7fbd4e92782cc4dc0b31dc3

> Add bloom filter policy option in RocksDBConfiguredOptions
> --
>
> Key: FLINK-24148
> URL: https://issues.apache.org/jira/browse/FLINK-24148
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.13.2, 1.14.1
>Reporter: Jiayi Liao
>Assignee: Jiayi Liao
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.1
>
>
> Bloom filter can efficiently enhance the read on RocksDB, especially for the 
> reading among L0 files. (more details see 
> https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24148) Add bloom filter policy option in RocksDBConfiguredOptions

2021-09-07 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-24148:
-
Fix Version/s: 1.14.1
   1.15.0

> Add bloom filter policy option in RocksDBConfiguredOptions
> --
>
> Key: FLINK-24148
> URL: https://issues.apache.org/jira/browse/FLINK-24148
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.13.2, 1.14.1
>Reporter: Jiayi Liao
>Assignee: Jiayi Liao
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.1
>
>
> Bloom filter can efficiently enhance the read on RocksDB, especially for the 
> reading among L0 files. (more details see 
> https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24149) Make checkpoint relocatable

2021-09-07 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17410943#comment-17410943
 ] 

Yun Tang commented on FLINK-24149:
--

[~Feifan Wang] That's why I think we should take all cases into considerations 
before providing one possible solution. I prefer to make checkpoint relocatable 
in one step or in one tool to avoid user misuse such as migrating incremental 
checkpoint from multi previous job instances. Thus, I prefer solution like 
state-processor-api to move checkpoint to another folder. What do you think or 
do you have better idea?

> Make checkpoint relocatable
> ---
>
> Key: FLINK-24149
> URL: https://issues.apache.org/jira/browse/FLINK-24149
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
>
> h3. Backgroud
> FLINK-5763 proposal make savepoint relocatable, checkpoint has similar 
> requirements. For example, to migrate jobs to other HDFS clusters, although 
> it can be achieved through a savepoint, but we prefer to use persistent 
> checkpoints, especially RocksDBStateBackend incremental checkpoints have 
> better performance than savepoint during snapshot and restore.
>  
> FLINK-8531 standardized directory layout :
> {code:java}
> /user-defined-checkpoint-dir
> |
> + 1b080b6e710aabbef8993ab18c6de98b (job's ID)
> |
> + --shared/
> + --taskowned/
> + --chk-1/
> + --chk-2/
> + --chk-3/
> ...
> {code}
>  * State backend will create a subdirectory with the job's ID that will 
> contain the actual checkpoints, such as: 
> user-defined-checkpoint-dir/1b080b6e710aabbef8993ab18c6de98b/
>  * Each checkpoint individually will store all its files in a subdirectory 
> that includes the checkpoint number, such as: 
> user-defined-checkpoint-dir/1b080b6e710aabbef8993ab18c6de98b/chk-3/
>  * Files shared between checkpoints will be stored in the shared/ directory 
> in the same parent directory as the separate checkpoint directory, such as: 
> user-defined-checkpoint-dir/1b080b6e710aabbef8993ab18c6de98b/shared/
>  * Similar to shared files, files owned strictly by tasks will be stored in 
> the taskowned/ directory in the same parent directory as the separate 
> checkpoint directory, such as: 
> user-defined-checkpoint-dir/1b080b6e710aabbef8993ab18c6de98b/taskowned/
> h3. Proposal
> Since the individually checkpoint directory does not contain complete state 
> data, we cannot make it relocatable, but its parent directory can. The only 
> work left is make the metadata file references relative file paths.
> I proposal make these changes to _*FsCheckpointStateOutputStream*_ :
>  * introduce _*checkpointDirectory*_ field, and remove *_allowRelativePaths_* 
> field
>  * introduce *_entropyInjecting_* field
>  * *_closeAndGetHandle()_* return _*RelativeFileStateHandle*_ with relative 
> path base on _*checkpointDirectory*_ (except entropy injecting file system)
> [~yunta], [~trohrmann] , I verified this in our environment , and submitted a 
> pull request to accomplish this feature. Please help evaluate whether it is 
> appropriate.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-23983) JVM crash when running RocksDBStateBackend tests

2021-09-06 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-23983.
--
Resolution: Fixed

Merged
master: 4f5978fd4f53428b6af85f487f3f2abbf2f590fd
release-1.14: 7bf42400d99a5d3421d85fa52309f7c9a870537a

> JVM crash when running RocksDBStateBackend tests
> 
>
> Key: FLINK-23983
> URL: https://issues.apache.org/jira/browse/FLINK-23983
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends, Tests
>Affects Versions: 1.14.0
>Reporter: Xintong Song
>Assignee: Yun Tang
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.14.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=22855=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=11131
> You would need to compare the mvn logs "Running xxx" with "Test run xxx in 
> xxx" to find out the unfinished test.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23983) JVM crash when running RocksDBStateBackend tests

2021-09-06 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-23983:
-
Fix Version/s: 1.15.0

> JVM crash when running RocksDBStateBackend tests
> 
>
> Key: FLINK-23983
> URL: https://issues.apache.org/jira/browse/FLINK-23983
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends, Tests
>Affects Versions: 1.14.0
>Reporter: Xintong Song
>Assignee: Yun Tang
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.14.0, 1.15.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=22855=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=11131
> You would need to compare the mvn logs "Running xxx" with "Test run xxx in 
> xxx" to find out the unfinished test.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file

2021-09-05 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17410352#comment-17410352
 ] 

Yun Tang commented on FLINK-23886:
--

[~pnowojski] I think FLINK-19741 is not like the problem we meet. I checked and 
re-create a RocksDB instacne reported by our customer without involing any raw 
keyed state stream. The unexpected key really existed in one SST file. Not sure 
whther [~qingru zhang] have the case with raw keyed state stream.

Since column family is the base unit to isolate data pyhsicially, this is 
really weird to see key from other column family. I have opened [an 
issue|https://github.com/facebook/rocksdb/issues/8718] in RocksDB community and 
hope for some feedback.

> An exception is thrown out when recover job timers from checkpoint file
> ---
>
> Key: FLINK-23886
> URL: https://issues.apache.org/jira/browse/FLINK-23886
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.10.0, 1.11.3
>Reporter: JING ZHANG
>Priority: Major
> Attachments: image-2021-08-25-16-38-04-023.png, 
> image-2021-08-25-16-38-12-308.png, image-2021-08-25-17-06-29-806.png, 
> image-2021-08-25-17-07-38-327.png
>
>
> A user report the bug in the [mailist. 
> |http://mail-archives.apache.org/mod_mbox/flink-user/202108.mbox/%3ccakmsf43j14nkjmgjuy4dh5qn2vbjtw4tfh4pmmuyvcvfhgf...@mail.gmail.com%3E]I
>  paste the content here.
> Setup Specifics:
>  Version: 1.6.2
>  RocksDB Map State
>  Timers stored in rocksdb
>   
>  When we have this job running for long periods of time like > 30 days, if 
> for some reason the job restarts, we encounter "Error while deserializing the 
> element". Is this a known issue fixed in later versions? I see some changes 
> to code for FLINK-10175, but we don't use any queryable state 
>   
>  Below is the stack trace
>   
>  org.apache.flink.util.FlinkRuntimeException: Error while deserializing the 
> element.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85)
> at 
> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:89)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:764)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:61)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.EOFException
> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
> at org.apache.flink.types.StringValue.readString(StringValue.java:769)
> at 
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
> at 
> 

[jira] [Commented] (FLINK-24149) Make checkpoint relocatable

2021-09-05 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17410315#comment-17410315
 ] 

Yun Tang commented on FLINK-24149:
--

[~Feifan Wang] The incremental checkpoint might have different path prefix, 
such as one shared file is from 
{{/user-defined-checkpoint-dir/job_id_1/shared}} and another one is from 
{{/user-defined-checkpoint-dir/job_id_2/shared}}. Could this still satisify the 
relocatable case?

> Make checkpoint relocatable
> ---
>
> Key: FLINK-24149
> URL: https://issues.apache.org/jira/browse/FLINK-24149
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
>
> h3. Backgroud
> FLINK-5763 proposal make savepoint relocatable, checkpoint has similar 
> requirements. For example, to migrate jobs to other HDFS clusters, although 
> it can be achieved through a savepoint, but we prefer to use persistent 
> checkpoints, especially RocksDBStateBackend incremental checkpoints have 
> better performance than savepoint during snapshot and restore.
>  
> FLINK-8531 standardized directory layout :
> {code:java}
> /user-defined-checkpoint-dir
> |
> + 1b080b6e710aabbef8993ab18c6de98b (job's ID)
> |
> + --shared/
> + --taskowned/
> + --chk-1/
> + --chk-2/
> + --chk-3/
> ...
> {code}
>  * State backend will create a subdirectory with the job's ID that will 
> contain the actual checkpoints, such as: 
> user-defined-checkpoint-dir/1b080b6e710aabbef8993ab18c6de98b/
>  * Each checkpoint individually will store all its files in a subdirectory 
> that includes the checkpoint number, such as: 
> user-defined-checkpoint-dir/1b080b6e710aabbef8993ab18c6de98b/chk-3/
>  * Files shared between checkpoints will be stored in the shared/ directory 
> in the same parent directory as the separate checkpoint directory, such as: 
> user-defined-checkpoint-dir/1b080b6e710aabbef8993ab18c6de98b/shared/
>  * Similar to shared files, files owned strictly by tasks will be stored in 
> the taskowned/ directory in the same parent directory as the separate 
> checkpoint directory, such as: 
> user-defined-checkpoint-dir/1b080b6e710aabbef8993ab18c6de98b/taskowned/
> h3. Proposal
> Since the individually checkpoint directory does not contain complete state 
> data, we cannot make it relocatable, but its parent directory can. The only 
> work left is make the metadata file references relative file paths.
> I proposal make these changes to _*FsCheckpointStateOutputStream*_ :
>  * introduce _*checkpointDirectory*_ field, and remove *_allowRelativePaths_* 
> field
>  * introduce *_entropyInjecting_* field
>  * *_closeAndGetHandle()_* return _*RelativeFileStateHandle*_ with relative 
> path base on _*checkpointDirectory*_ (except entropy injecting file system)
> [~yunta], [~trohrmann] , I verified this in our environment , and submitted a 
> pull request to accomplish this feature. Please help evaluate whether it is 
> appropriate.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24149) Make checkpoint relocatable

2021-09-05 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17410136#comment-17410136
 ] 

Yun Tang commented on FLINK-24149:
--

Before further discussion or review, I have several questions:
 # Can this feature support exsiting not-self-contained full checkpoints?
 # Can this feature support exsiting not-self-contained incremental 
checkpoints? 
 # Can this feature support exsiting incremental checkpoint which might contain 
files from multi previous checkpoint directories?

> Make checkpoint relocatable
> ---
>
> Key: FLINK-24149
> URL: https://issues.apache.org/jira/browse/FLINK-24149
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
>
> h3. Backgroud
> FLINK-5763 proposal make savepoint relocatable, checkpoint has similar 
> requirements. For example, to migrate jobs to other HDFS clusters, although 
> it can be achieved through a savepoint, but we prefer to use persistent 
> checkpoints, especially RocksDBStateBackend incremental checkpoints have 
> better performance than savepoint during snapshot and restore.
>  
> FLINK-8531 standardized directory layout :
> {code:java}
> /user-defined-checkpoint-dir
> |
> + 1b080b6e710aabbef8993ab18c6de98b (job's ID)
> |
> + --shared/
> + --taskowned/
> + --chk-1/
> + --chk-2/
> + --chk-3/
> ...
> {code}
>  * State backend will create a subdirectory with the job's ID that will 
> contain the actual checkpoints, such as: 
> user-defined-checkpoint-dir/1b080b6e710aabbef8993ab18c6de98b/
>  * Each checkpoint individually will store all its files in a subdirectory 
> that includes the checkpoint number, such as: 
> user-defined-checkpoint-dir/1b080b6e710aabbef8993ab18c6de98b/chk-3/
>  * Files shared between checkpoints will be stored in the shared/ directory 
> in the same parent directory as the separate checkpoint directory, such as: 
> user-defined-checkpoint-dir/1b080b6e710aabbef8993ab18c6de98b/shared/
>  * Similar to shared files, files owned strictly by tasks will be stored in 
> the taskowned/ directory in the same parent directory as the separate 
> checkpoint directory, such as: 
> user-defined-checkpoint-dir/1b080b6e710aabbef8993ab18c6de98b/taskowned/
> h3. Proposal
> Since the individually checkpoint directory does not contain complete state 
> data, we cannot make it relocatable, but its parent directory can. The only 
> work left is make the metadata file references relative file paths.
> I proposal make these changes to _*FsCheckpointStateOutputStream*_ :
>  * introduce _*checkpointDirectory*_ field, and remove *_allowRelativePaths_* 
> field
>  * introduce *_entropyInjecting_* field
>  * *_closeAndGetHandle()_* return _*RelativeFileStateHandle*_ with relative 
> path base on _*checkpointDirectory*_ (except entropy injecting file system)
> [~yunta], [~trohrmann] , I verified this in our environment , and submitted a 
> pull request to accomplish this feature. Please help evaluate whether it is 
> appropriate.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-23822) Test config state ttl in Python DataStream API

2021-09-03 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-23822.
--
Resolution: Fixed

> Test config state ttl in Python DataStream API
> --
>
> Key: FLINK-23822
> URL: https://issues.apache.org/jira/browse/FLINK-23822
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: Huang Xingbo
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.14.0
>
>
> The newly feature allows users to config the state ttl in Python DataStream 
> API.
> In order to test this new feature I recommend to follow the documentation[1]
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/#state-time-to-live-ttl



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23822) Test config state ttl in Python DataStream API

2021-09-03 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17409585#comment-17409585
 ] 

Yun Tang commented on FLINK-23822:
--

Verify the state TTL in python API via docs.

Found two problems and created related sub-tasks.

 

After resolved all problems, it proved to work as expected on both 
HashMapStateBackend and EmbeddedRocksDBStateBackend.

> Test config state ttl in Python DataStream API
> --
>
> Key: FLINK-23822
> URL: https://issues.apache.org/jira/browse/FLINK-23822
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: Huang Xingbo
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.14.0
>
>
> The newly feature allows users to config the state ttl in Python DataStream 
> API.
> In order to test this new feature I recommend to follow the documentation[1]
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/#state-time-to-live-ttl



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24148) Add bloom filter policy option in RocksDBConfiguredOptions

2021-09-03 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17409484#comment-17409484
 ] 

Yun Tang commented on FLINK-24148:
--

Bloom filter takes effects in production environment. [~wind_ljy] already 
assigned to you, please go ahead.

> Add bloom filter policy option in RocksDBConfiguredOptions
> --
>
> Key: FLINK-24148
> URL: https://issues.apache.org/jira/browse/FLINK-24148
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.13.2, 1.14.1
>Reporter: Jiayi Liao
>Assignee: Jiayi Liao
>Priority: Minor
>
> Bloom filter can efficiently enhance the read on RocksDB, especially for the 
> reading among L0 files. (more details see 
> https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-24148) Add bloom filter policy option in RocksDBConfiguredOptions

2021-09-03 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-24148:


Assignee: Jiayi Liao

> Add bloom filter policy option in RocksDBConfiguredOptions
> --
>
> Key: FLINK-24148
> URL: https://issues.apache.org/jira/browse/FLINK-24148
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.13.2, 1.14.1
>Reporter: Jiayi Liao
>Assignee: Jiayi Liao
>Priority: Minor
>
> Bloom filter can efficiently enhance the read on RocksDB, especially for the 
> reading among L0 files. (more details see 
> https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23983) JVM crash when running RocksDBStateBackend tests

2021-09-02 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-23983:
-
Component/s: Tests

> JVM crash when running RocksDBStateBackend tests
> 
>
> Key: FLINK-23983
> URL: https://issues.apache.org/jira/browse/FLINK-23983
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends, Tests
>Affects Versions: 1.14.0
>Reporter: Xintong Song
>Assignee: Yun Tang
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.14.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=22855=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=11131
> You would need to compare the mvn logs "Running xxx" with "Test run xxx in 
> xxx" to find out the unfinished test.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-23983) JVM crash when running RocksDBStateBackend tests

2021-09-02 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-23983:


Assignee: Yun Tang

> JVM crash when running RocksDBStateBackend tests
> 
>
> Key: FLINK-23983
> URL: https://issues.apache.org/jira/browse/FLINK-23983
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.14.0
>Reporter: Xintong Song
>Assignee: Yun Tang
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.14.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=22855=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=11131
> You would need to compare the mvn logs "Running xxx" with "Test run xxx in 
> xxx" to find out the unfinished test.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23983) JVM crash when running RocksDBStateBackend tests

2021-09-02 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17408575#comment-17408575
 ] 

Yun Tang commented on FLINK-23983:
--

I could reproduced it locally by running RocksDBStateBackend tests repeatly. 
Since we bumped RocksDB recently, and no obvious tests changed under 
flink-statebackend-rocksdb module, I also doubted that some behavior has 
changed in RocksDB itself. It took me some time to figure out this might be 
caused by change in \{{StateBackendTestBase}} which changed the previous test 
by creating a new keyed state-backend but not disposing the older one (refer to 
https://github.com/apache/flink/commit/e8daf67ce5096da791e21d0915848c78c395822d 
cc @roman [~rkhachatryan]  ).

> JVM crash when running RocksDBStateBackend tests
> 
>
> Key: FLINK-23983
> URL: https://issues.apache.org/jira/browse/FLINK-23983
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.14.0
>Reporter: Xintong Song
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.14.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=22855=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=11131
> You would need to compare the mvn logs "Running xxx" with "Test run xxx in 
> xxx" to find out the unfinished test.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-24106) Remove notice that python does not support state TTL

2021-09-01 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-24106.
--
Resolution: Fixed

merged
master: aa2dddcf1cf9fb6241c4840f9c907530b76dc02a
release-1.14: 297795fa9d3977a3930a4de491a0d15fb350ad19

> Remove notice that python does not support state TTL
> 
>
> Key: FLINK-24106
> URL: https://issues.apache.org/jira/browse/FLINK-24106
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> Current documentation 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/
>  still say that "State TTL is still not supported in PyFlink DataStream API." 
> We should fix that from Flink-1.14.0



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-24106) Remove notice that python does not support state TTL

2021-09-01 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-24106:


Assignee: Yun Tang

> Remove notice that python does not support state TTL
> 
>
> Key: FLINK-24106
> URL: https://issues.apache.org/jira/browse/FLINK-24106
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Blocker
> Fix For: 1.14.0
>
>
> Current documentation 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/
>  still say that "State TTL is still not supported in PyFlink DataStream API." 
> We should fix that from Flink-1.14.0



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24106) Remove notice that python does not support state TTL

2021-09-01 Thread Yun Tang (Jira)
Yun Tang created FLINK-24106:


 Summary: Remove notice that python does not support state TTL
 Key: FLINK-24106
 URL: https://issues.apache.org/jira/browse/FLINK-24106
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: Yun Tang
 Fix For: 1.14.0


Current documentation 
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/
 still say that "State TTL is still not supported in PyFlink DataStream API." 
We should fix that from Flink-1.14.0



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24105) state TTL might not take effect for pyflink

2021-09-01 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-24105:
-
Parent: FLINK-23822
Issue Type: Sub-task  (was: Bug)

> state TTL might not take effect for pyflink
> ---
>
> Key: FLINK-24105
> URL: https://issues.apache.org/jira/browse/FLINK-24105
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python, Runtime / State Backends
>Reporter: Yun Tang
>Priority: Blocker
> Fix For: 1.14.0
>
>
> Since pyflink has its own data cache on python side, it might still read the 
> data from python side even TTL has expired.
> Scripts below could reproduce this:
> {code:python}
> from pyflink.common.time import Time
> from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig, 
> ListStateDescriptor, MapStateDescriptor
> from pyflink.common.typeinfo import Types
> from pyflink.datastream import StreamExecutionEnvironment, 
> TimeCharacteristic, RuntimeContext, KeyedProcessFunction, \
> EmbeddedRocksDBStateBackend
> import time
> from datetime import datetime
> def test_keyed_process_function_with_state():
> env = StreamExecutionEnvironment.get_execution_environment()
> env.get_config().set_auto_watermark_interval(2000)
> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> env.set_state_backend(EmbeddedRocksDBStateBackend())
> data_stream = env.from_collection([(1, 'hi', '1603708211000'),
> (3, 'hi', '1603708226000'),
> (10, 'hi', '1603708226000'),
> (6, 'hello', '1603708293000')],
>type_info=Types.ROW([Types.INT(), 
> Types.STRING(),
> 
> Types.STRING()]))
> class MyProcessFunction(KeyedProcessFunction):
> def __init__(self):
> self.value_state = None
> self.list_state = None
> self.map_state = None
> def open(self, runtime_context: RuntimeContext):
> state_ttl_config = StateTtlConfig \
> .new_builder(Time.seconds(1)) \
> .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) \
> .never_return_expired() \
> .build()
> value_state_descriptor = ValueStateDescriptor('value_state', 
> Types.INT())
> value_state_descriptor.enable_time_to_live(state_ttl_config)
> self.value_state = 
> runtime_context.get_state(value_state_descriptor)
> list_state_descriptor = ListStateDescriptor('list_state', 
> Types.INT())
> list_state_descriptor.enable_time_to_live(state_ttl_config)
> self.list_state = 
> runtime_context.get_list_state(list_state_descriptor)
> map_state_descriptor = MapStateDescriptor('map_state', 
> Types.INT(), Types.STRING())
> map_state_descriptor.enable_time_to_live(state_ttl_config)
> self.map_state = 
> runtime_context.get_map_state(map_state_descriptor)
> def process_element(self, value, ctx):
> time.sleep(20)
> current_value = self.value_state.value()
> self.value_state.update(value[0])
> current_list = [_ for _ in self.list_state.get()]
> self.list_state.add(value[0])
> map_entries_string = []
> for k, v in self.map_state.items():
> map_entries_string.append(str(k) + ': ' + str(v))
> map_entries_string = '{' + ', '.join(map_entries_string) + '}'
> self.map_state.put(value[0], value[1])
> current_key = ctx.get_current_key()
> yield "time: {}, current key: {}, current value state: {}, 
> current list state: {}, " \
>   "current map state: {}, current value: 
> {}".format(str(datetime.now().time()),
> 
> str(current_key),
> 
> str(current_value),
> 
> str(current_list),
> 
> map_entries_string,
> 
> str(value))
> def on_timer(self, timestamp, ctx):
> pass
> data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) \
> .process(MyProcessFunction(), output_type=Types.STRING()) \
> .print()
> env.execute('test time stamp assigner with keyed process function')
> if __name__ == '__main__':
> test_keyed_process_function_with_state()
> {code}



--
This 

[jira] [Created] (FLINK-24105) state TTL might not take effect for pyflink

2021-09-01 Thread Yun Tang (Jira)
Yun Tang created FLINK-24105:


 Summary: state TTL might not take effect for pyflink
 Key: FLINK-24105
 URL: https://issues.apache.org/jira/browse/FLINK-24105
 Project: Flink
  Issue Type: Bug
  Components: API / Python, Runtime / State Backends
Reporter: Yun Tang
 Fix For: 1.14.0


Since pyflink has its own data cache on python side, it might still read the 
data from python side even TTL has expired.

Scripts below could reproduce this:
{code:python}
from pyflink.common.time import Time
from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig, 
ListStateDescriptor, MapStateDescriptor
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, 
RuntimeContext, KeyedProcessFunction, \
EmbeddedRocksDBStateBackend
import time
from datetime import datetime

def test_keyed_process_function_with_state():
env = StreamExecutionEnvironment.get_execution_environment()
env.get_config().set_auto_watermark_interval(2000)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_state_backend(EmbeddedRocksDBStateBackend())
data_stream = env.from_collection([(1, 'hi', '1603708211000'),
(3, 'hi', '1603708226000'),
(10, 'hi', '1603708226000'),
(6, 'hello', '1603708293000')],
   type_info=Types.ROW([Types.INT(), 
Types.STRING(),

Types.STRING()]))


class MyProcessFunction(KeyedProcessFunction):

def __init__(self):
self.value_state = None
self.list_state = None
self.map_state = None

def open(self, runtime_context: RuntimeContext):
state_ttl_config = StateTtlConfig \
.new_builder(Time.seconds(1)) \
.set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) \
.never_return_expired() \
.build()
value_state_descriptor = ValueStateDescriptor('value_state', 
Types.INT())
value_state_descriptor.enable_time_to_live(state_ttl_config)
self.value_state = runtime_context.get_state(value_state_descriptor)
list_state_descriptor = ListStateDescriptor('list_state', 
Types.INT())
list_state_descriptor.enable_time_to_live(state_ttl_config)
self.list_state = 
runtime_context.get_list_state(list_state_descriptor)
map_state_descriptor = MapStateDescriptor('map_state', Types.INT(), 
Types.STRING())
map_state_descriptor.enable_time_to_live(state_ttl_config)
self.map_state = runtime_context.get_map_state(map_state_descriptor)

def process_element(self, value, ctx):
time.sleep(20)
current_value = self.value_state.value()
self.value_state.update(value[0])
current_list = [_ for _ in self.list_state.get()]
self.list_state.add(value[0])
map_entries_string = []
for k, v in self.map_state.items():
map_entries_string.append(str(k) + ': ' + str(v))
map_entries_string = '{' + ', '.join(map_entries_string) + '}'
self.map_state.put(value[0], value[1])
current_key = ctx.get_current_key()
yield "time: {}, current key: {}, current value state: {}, current 
list state: {}, " \
  "current map state: {}, current value: 
{}".format(str(datetime.now().time()),

str(current_key),

str(current_value),

str(current_list),

map_entries_string,
str(value))

def on_timer(self, timestamp, ctx):
pass


data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) \
.process(MyProcessFunction(), output_type=Types.STRING()) \
.print()
env.execute('test time stamp assigner with keyed process function')

if __name__ == '__main__':
test_keyed_process_function_with_state()
{code}






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23519) Aggregate State Backend Latency by State Level

2021-08-30 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-23519:
-
Fix Version/s: (was: 1.14.0)
   1.14.1
   1.15.0
   1.13.3

> Aggregate State Backend Latency by State Level
> --
>
> Key: FLINK-23519
> URL: https://issues.apache.org/jira/browse/FLINK-23519
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics, Runtime / State Backends
>Affects Versions: 1.13.0
>Reporter: Mason Chen
>Assignee: Yun Tang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.13.3, 1.15.0, 1.14.1
>
>
> To make metrics aggregation easier, there should be a config to expose 
> something like `state.backend.rocksdb.metrics.column-family-as-variable` that 
> rocksdb provides to do aggregation across column families 
> ([https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#state-backend-rocksdb-metrics-column-family-as-variable]).
>  
> In this case of state backend latency, the variable exposed would be state 
> level instead column family. This makes it easier to aggregate by the various 
> state levels that are reported.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24046) Refactor the relationship bwtween PredefinedOptions and RocksDBConfigurableOptions

2021-08-29 Thread Yun Tang (Jira)
Yun Tang created FLINK-24046:


 Summary: Refactor the relationship bwtween PredefinedOptions and 
RocksDBConfigurableOptions
 Key: FLINK-24046
 URL: https://issues.apache.org/jira/browse/FLINK-24046
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: Yun Tang
Assignee: Zakelly Lan
 Fix For: 1.14.1


RocksDBConfigurableOptions mainly focus on the settings of DBOptions and 
ColumnFamilyOptions. The original design of this class is used to let user 
could configure RocksDB via configurations instead of programmatically 
implemented RocksDBOptionsFactory.
To make the minimal change, original options in RocksDBConfigurableOptions have 
no default value so that we would not make anything happen in 
DefaultConfigurableOptionsFactory just as before.
However, this make user not so clear of the option meaning with no default 
value, and we could consider change the relationship between them.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23791) Enable RocksDB log again

2021-08-29 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-23791:
-
Fix Version/s: (was: 1.14.0)
   1.14.1

> Enable RocksDB log again
> 
>
> Key: FLINK-23791
> URL: https://issues.apache.org/jira/browse/FLINK-23791
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.1
>
>
> FLINK-15068 disabled the RocksDB's local LOG due to previous RocksDB cannot 
> limit the local log files.
> After we upgraded to newer RocksDB version, we can then enable RocksDB log 
> again.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23721) Flink SQL state TTL has no effect when using non-incremental RocksDBStateBackend

2021-08-29 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17406527#comment-17406527
 ] 

Yun Tang commented on FLINK-23721:
--

[~lmagics] could you share the code example to reproduce this on local 
environment so that we could run it directly.

> Flink SQL state TTL has no effect when using non-incremental 
> RocksDBStateBackend
> 
>
> Key: FLINK-23721
> URL: https://issues.apache.org/jira/browse/FLINK-23721
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends, Table SQL / Runtime
>Affects Versions: 1.13.0
>Reporter: Q Kang
>Priority: Major
>
> Take the following deduplication SQL program as an example:
> {code:java}
> SET table.exec.state.ttl=30s;
> INSERT INTO tmp.blackhole_order_done_log
> SELECT t.* FROM (
>   SELECT *,ROW_NUMBER() OVER(PARTITION BY suborderid ORDER BY procTime ASC) 
> AS rn
>   FROM rtdw_ods.kafka_order_done_log
> ) AS t WHERE rn = 1;
> {code}
> When using RocksDBStateBackend with incremental checkpoint enabled, the size 
> of deduplication state seems OK.
> FlinkCompactionFilter is also working, regarding to logs below:
> {code:java}
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3481026D01, Value 
> type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Last access timestamp: 
> 1628673475181 ms, ttlWithoutOverflow: 3 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Decision: 1
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3484064901, Value 
> type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Last access timestamp: 
> 1628673672777 ms, ttlWithoutOverflow: 3 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Decision: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3483341D01, Value 
> type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Last access timestamp: 
> 1628673618973 ms, ttlWithoutOverflow: 3 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Decision: 1
> {code}
> However, after turning off incremental checkpoint, the state TTL seems not 
> effective at all: FlinkCompactionFilter logs are not printed, and the size of 
> deduplication state grows steadily up to several GBs (Kafka traffic is 
> somewhat heavy, at about 1K records per sec).
> In contrast, FsStateBackend always works well.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23983) JVM crash when running RocksDBStateBackend tests

2021-08-29 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-23983:
-
Summary: JVM crash when running RocksDBStateBackend tests  (was: JVM crash 
when running RocksDBStateBackendConfigTest)

> JVM crash when running RocksDBStateBackend tests
> 
>
> Key: FLINK-23983
> URL: https://issues.apache.org/jira/browse/FLINK-23983
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.14.0
>Reporter: Xintong Song
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.14.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=22855=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=11131
> You would need to compare the mvn logs "Running xxx" with "Test run xxx in 
> xxx" to find out the unfinished test.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23721) Flink SQL state TTL has no effect when using non-incremental RocksDBStateBackend

2021-08-27 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17405659#comment-17405659
 ] 

Yun Tang commented on FLINK-23721:
--

[~lmagics] From the code implementation, what you described is really weird. I 
think you could use jmap to dump the task manager to see what the configuration 
of `org.apache.flink.api.common.state.StateTtlConfig` is taking effect (maybe 
you have many states, please check StateTtlConfig as many as possible).
BTW, [~jark] does SQL have additional condition to make the  
table.exec.state.ttl take effect?

> Flink SQL state TTL has no effect when using non-incremental 
> RocksDBStateBackend
> 
>
> Key: FLINK-23721
> URL: https://issues.apache.org/jira/browse/FLINK-23721
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends, Table SQL / Runtime
>Affects Versions: 1.13.0
>Reporter: Q Kang
>Priority: Major
>
> Take the following deduplication SQL program as an example:
> {code:java}
> SET table.exec.state.ttl=30s;
> INSERT INTO tmp.blackhole_order_done_log
> SELECT t.* FROM (
>   SELECT *,ROW_NUMBER() OVER(PARTITION BY suborderid ORDER BY procTime ASC) 
> AS rn
>   FROM rtdw_ods.kafka_order_done_log
> ) AS t WHERE rn = 1;
> {code}
> When using RocksDBStateBackend with incremental checkpoint enabled, the size 
> of deduplication state seems OK.
> FlinkCompactionFilter is also working, regarding to logs below:
> {code:java}
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3481026D01, Value 
> type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Last access timestamp: 
> 1628673475181 ms, ttlWithoutOverflow: 3 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Decision: 1
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3484064901, Value 
> type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Last access timestamp: 
> 1628673672777 ms, ttlWithoutOverflow: 3 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Decision: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3483341D01, Value 
> type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Last access timestamp: 
> 1628673618973 ms, ttlWithoutOverflow: 3 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Decision: 1
> {code}
> However, after turning off incremental checkpoint, the state TTL seems not 
> effective at all: FlinkCompactionFilter logs are not printed, and the size of 
> deduplication state grows steadily up to several GBs (Kafka traffic is 
> somewhat heavy, at about 1K records per sec).
> In contrast, FsStateBackend always works well.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file

2021-08-27 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17405647#comment-17405647
 ] 

Yun Tang commented on FLINK-23886:
--

Thanks for Piotr's explaniation, [~qingru zhang] do you have legacy souce or 
used the checkpoint lock in the same task?

Moreover, could you help to do things below to make things more easy to figure 
out?
1. reproduce it on a supported Flink version (Flink-1.13 is better)
2. share the code or provide a minimalistic example that reproduces this problem

> An exception is thrown out when recover job timers from checkpoint file
> ---
>
> Key: FLINK-23886
> URL: https://issues.apache.org/jira/browse/FLINK-23886
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: JING ZHANG
>Priority: Major
> Attachments: image-2021-08-25-16-38-04-023.png, 
> image-2021-08-25-16-38-12-308.png, image-2021-08-25-17-06-29-806.png, 
> image-2021-08-25-17-07-38-327.png
>
>
> A user report the bug in the [mailist. 
> |http://mail-archives.apache.org/mod_mbox/flink-user/202108.mbox/%3ccakmsf43j14nkjmgjuy4dh5qn2vbjtw4tfh4pmmuyvcvfhgf...@mail.gmail.com%3E]I
>  paste the content here.
> Setup Specifics:
>  Version: 1.6.2
>  RocksDB Map State
>  Timers stored in rocksdb
>   
>  When we have this job running for long periods of time like > 30 days, if 
> for some reason the job restarts, we encounter "Error while deserializing the 
> element". Is this a known issue fixed in later versions? I see some changes 
> to code for FLINK-10175, but we don't use any queryable state 
>   
>  Below is the stack trace
>   
>  org.apache.flink.util.FlinkRuntimeException: Error while deserializing the 
> element.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85)
> at 
> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:89)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:764)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:61)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.EOFException
> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
> at org.apache.flink.types.StringValue.readString(StringValue.java:769)
> at 
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
> at 
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:179)
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:46)
> at 
> 

[jira] [Resolved] (FLINK-23800) Expose newly added RocksDB native metrics

2021-08-26 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-23800.
--
Resolution: Fixed

merged in master
bbcdb0b360f16647beb81206715ea54f7010b71e

> Expose newly added RocksDB native metrics
> -
>
> Key: FLINK-23800
> URL: https://issues.apache.org/jira/browse/FLINK-23800
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> Since Flink bumped RocksDB version, we could expose more newly added RocksDB 
> native metrics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file

2021-08-26 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17405551#comment-17405551
 ] 

Yun Tang commented on FLINK-23886:
--

[~pnowojski] would you please share more information about the unsafe 
invocations of `onTimer()` and `processElement()`? From user's report and 
analysis on restored RocksDB instance, we can see that some data from other 
state has been written to timer state by mistake, could that happen under such 
unsafe case?

> An exception is thrown out when recover job timers from checkpoint file
> ---
>
> Key: FLINK-23886
> URL: https://issues.apache.org/jira/browse/FLINK-23886
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: JING ZHANG
>Priority: Major
> Attachments: image-2021-08-25-16-38-04-023.png, 
> image-2021-08-25-16-38-12-308.png, image-2021-08-25-17-06-29-806.png, 
> image-2021-08-25-17-07-38-327.png
>
>
> A user report the bug in the [mailist. 
> |http://mail-archives.apache.org/mod_mbox/flink-user/202108.mbox/%3ccakmsf43j14nkjmgjuy4dh5qn2vbjtw4tfh4pmmuyvcvfhgf...@mail.gmail.com%3E]I
>  paste the content here.
> Setup Specifics:
>  Version: 1.6.2
>  RocksDB Map State
>  Timers stored in rocksdb
>   
>  When we have this job running for long periods of time like > 30 days, if 
> for some reason the job restarts, we encounter "Error while deserializing the 
> element". Is this a known issue fixed in later versions? I see some changes 
> to code for FLINK-10175, but we don't use any queryable state 
>   
>  Below is the stack trace
>   
>  org.apache.flink.util.FlinkRuntimeException: Error while deserializing the 
> element.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85)
> at 
> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)
> at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:89)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:764)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:61)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.EOFException
> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
> at org.apache.flink.types.StringValue.readString(StringValue.java:769)
> at 
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
> at 
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:179)
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:46)
> at 
> 

[jira] [Resolved] (FLINK-23992) Update doc version in release-1.13 to 1.13.2

2021-08-26 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-23992.
--
Resolution: Fixed

fixed in release-1.13.2
b5e593453fd57911827aa185e46fcbb8f8495260

> Update doc version in release-1.13 to 1.13.2
> 
>
> Key: FLINK-23992
> URL: https://issues.apache.org/jira/browse/FLINK-23992
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.2
>
>
> Current version in doc of  branch release-1.13 is still 1.13.0 and we should 
> fix it to 1.13.2



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-23992) Update doc version in release-1.13 to 1.13.2

2021-08-26 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-23992:


Assignee: Yun Tang

> Update doc version in release-1.13 to 1.13.2
> 
>
> Key: FLINK-23992
> URL: https://issues.apache.org/jira/browse/FLINK-23992
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.2
>
>
> Current version in doc of  branch release-1.13 is still 1.13.0 and we should 
> fix it to 1.13.2



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23992) Update doc version in release-1.13 to 1.13.2

2021-08-26 Thread Yun Tang (Jira)
Yun Tang created FLINK-23992:


 Summary: Update doc version in release-1.13 to 1.13.2
 Key: FLINK-23992
 URL: https://issues.apache.org/jira/browse/FLINK-23992
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Reporter: Yun Tang
 Fix For: 1.13.2


Current version in doc of  branch release-1.13 is still 1.13.0 and we should 
fix it to 1.13.2



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (FLINK-23983) JVM crash when running RocksDBStateBackendConfigTest

2021-08-25 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-23983:
-
Comment: was deleted

(was: I checked the dump file hprof 
https://heaphero.io/my-heap-report.jsp?p=YXJjaGl2ZWQvMjAyMS8wOC8yNi8tLS1fX3ctMS1zLWZsaW5rLXN0cmVhbWluZy1qYXZhLWphdmFfcGlkMTQ0MTk5Lmhwcm9mLTMtMzItMzUuanNvbg==
 which has no obvious RocksDB related classes. And I also not find core dump 
log in the artificat. From my experience, this might not caused by RocksDB core 
dump but a java error.)

> JVM crash when running RocksDBStateBackendConfigTest
> 
>
> Key: FLINK-23983
> URL: https://issues.apache.org/jira/browse/FLINK-23983
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.14.0
>Reporter: Xintong Song
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.14.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=22855=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=11131
> You would need to compare the mvn logs "Running xxx" with "Test run xxx in 
> xxx" to find out the unfinished test.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23983) JVM crash when running RocksDBStateBackendConfigTest

2021-08-25 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17404897#comment-17404897
 ] 

Yun Tang commented on FLINK-23983:
--

I checked the dump file hprof 
https://heaphero.io/my-heap-report.jsp?p=YXJjaGl2ZWQvMjAyMS8wOC8yNi8tLS1fX3ctMS1zLWZsaW5rLXN0cmVhbWluZy1qYXZhLWphdmFfcGlkMTQ0MTk5Lmhwcm9mLTMtMzItMzUuanNvbg==
 which has no obvious RocksDB related classes. And I also not find core dump 
log in the artificat. From my experience, this might not caused by RocksDB core 
dump but a java error.

> JVM crash when running RocksDBStateBackendConfigTest
> 
>
> Key: FLINK-23983
> URL: https://issues.apache.org/jira/browse/FLINK-23983
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.14.0
>Reporter: Xintong Song
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.14.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=22855=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=11131
> You would need to compare the mvn logs "Running xxx" with "Test run xxx in 
> xxx" to find out the unfinished test.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-23519) Aggregate State Backend Latency by State Level

2021-08-24 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-23519:


Assignee: Yun Tang

> Aggregate State Backend Latency by State Level
> --
>
> Key: FLINK-23519
> URL: https://issues.apache.org/jira/browse/FLINK-23519
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics, Runtime / State Backends
>Affects Versions: 1.13.0
>Reporter: Mason Chen
>Assignee: Yun Tang
>Priority: Minor
> Fix For: 1.14.0
>
>
> To make metrics aggregation easier, there should be a config to expose 
> something like `state.backend.rocksdb.metrics.column-family-as-variable` that 
> rocksdb provides to do aggregation across column families 
> ([https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#state-backend-rocksdb-metrics-column-family-as-variable]).
>  
> In this case of state backend latency, the variable exposed would be state 
> level instead column family. This makes it easier to aggregate by the various 
> state levels that are reported.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23519) Aggregate State Backend Latency by State Level

2021-08-24 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-23519:
-
Fix Version/s: 1.14.0

> Aggregate State Backend Latency by State Level
> --
>
> Key: FLINK-23519
> URL: https://issues.apache.org/jira/browse/FLINK-23519
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics, Runtime / State Backends
>Affects Versions: 1.13.0
>Reporter: Mason Chen
>Priority: Minor
> Fix For: 1.14.0
>
>
> To make metrics aggregation easier, there should be a config to expose 
> something like `state.backend.rocksdb.metrics.column-family-as-variable` that 
> rocksdb provides to do aggregation across column families 
> ([https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#state-backend-rocksdb-metrics-column-family-as-variable]).
>  
> In this case of state backend latency, the variable exposed would be state 
> level instead column family. This makes it easier to aggregate by the various 
> state levels that are reported.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23949) first incremental checkpoint after a savepoint will degenerate into a full checkpoint

2021-08-24 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17404141#comment-17404141
 ] 

Yun Tang commented on FLINK-23949:
--

Thanks for creating this ticket and I think this deserves a fix, I have 
assigned this ticket to you.

> first incremental checkpoint after a savepoint will degenerate into a full 
> checkpoint
> -
>
> Key: FLINK-23949
> URL: https://issues.apache.org/jira/browse/FLINK-23949
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.11.4, 1.12.5, 1.13.2
>Reporter: Feifan Wang
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2021-08-25-00-59-05-779.png
>
>
> In RocksIncrementalSnapshotStrategy we will record the uploaded rocksdb files 
> corresponding to the checkpoint id,and clean it in 
> _CheckpointListener#notifyCheckpointComplete ._
> {code:java}
> @Override
> public void notifyCheckpointComplete(long completedCheckpointId) {
> synchronized (materializedSstFiles) {
> if (completedCheckpointId > lastCompletedCheckpointId) {
> materializedSstFiles
> .keySet()
> .removeIf(checkpointId -> checkpointId < 
> completedCheckpointId);
> lastCompletedCheckpointId = completedCheckpointId;
> }
> }
> }{code}
>  
> This works well without savepoint, but when a savepoint is completed, it will 
> clean up the _materializedSstFiles_ of the previous checkpoint. It leads to 
> the first checkpoint after the savepoint must upload all files in rocksdb.
> !image-2021-08-25-00-59-05-779.png|width=1188,height=163!
> Solving the problem is also very simple, I propose to clean 
> _materializedSstFiles_ and update  _lastCompletedCheckpointId_ only when 
> {color:#ff}_materializedSstFiles.keySet().contains(completedCheckpointId)_{color}
>  .
> If a _completedCheckpointId_ is not in _materializedSstFiles.keySet()_ , 
> there are only two cases:
> 1. It is a checkpoint but there is a checkpoint with larger id number 
> completed before it
> 2. It is a savepoint (savepoint not produce by 
> RocksIncrementalSnapshotStrategy)
> In either case we don’t need clean _materializedSstFiles_ and update  
> _lastCompletedCheckpointId_  anymore.
> [~yunta] , [~trohrmann] , I have submitted a pull request to solve this 
> problem, please evaluate whether it is appropriate.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


<    4   5   6   7   8   9   10   11   12   13   >