[jira] [Resolved] (FLINK-21504) State ownership: notify TMs about checkpoint subsumption

2021-12-09 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-21504.
--
Resolution: Fixed

merged in master: def3855690f8a5ac24a889dc4ed5909d4f9b31aa

> State ownership: notify TMs about checkpoint subsumption
> 
>
> Key: FLINK-21504
> URL: https://issues.apache.org/jira/browse/FLINK-21504
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Roman Khachatryan
>Assignee: Yun Tang
>Priority: Major
>  Labels: auto-unassigned, pull-request-available
> Fix For: 1.15.0
>
>
> Goal: enable TMs to discard state after subsumption (FLINK-23139).
>  Proposed solution: include earliest non-subsumed checkpoint ID into 
> checkpoint confirmation notification.
> Code-wise, it would require passing the checkpoint ID through many existing 
> CheckpointListener implementations (e.g. Task). CheckpointListener is a 
> public interface and should not be concerned with retained checkpoints 
> (runtime detail). So it's better to remove "implements" from such classes and 
> either call methods directly or introduce a new (runtime) interface if 
> necessary.
>  
>  See [state ownership design 
> doc|https://docs.google.com/document/d/1NJJQ30P27BmUvD7oa4FChvkYxMEgjRPTVdO1dHLl_9I/edit?usp=sharing],
>  in particular [subsumption 
> notifications|https://docs.google.com/document/d/1NJJQ30P27BmUvD7oa4FChvkYxMEgjRPTVdO1dHLl_9I/edit#heading=h.auqo5xe66sg5]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-4266) Cassandra SplitOver Statebackend

2021-12-08 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-4266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17456215#comment-17456215
 ] 

Yun Tang commented on FLINK-4266:
-

[~foxss] What's the progress of thie ticket, can we close it now?

> Cassandra SplitOver Statebackend
> 
>
> Key: FLINK-4266
> URL: https://issues.apache.org/jira/browse/FLINK-4266
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / State Backends
>Affects Versions: 1.3.0
>Reporter: Chen Qin
>Assignee: Chen Qin
>Priority: Not a Priority
>  Labels: stale-assigned
>
> Current FileSystem statebackend limits whole state size to disk space. 
> Dealing with scale out checkpoint states beyond local disk space such as long 
> running task that hold window content for long period of time. Pipelines 
> needs to split out states to durable remote storage even replicated to 
> different data centers.
> We draft a design that leverage checkpoint id as mono incremental logic 
> timestamp and perform range query to get evicited state k/v. we also 
> introduce checkpoint time commit and eviction threshold that reduce "hot 
> states" hitting remote db per every update between adjacent checkpoints by 
> tracking update logs and merge them, do batch updates only when checkpoint; 
> lastly, we are looking for eviction policy that can identify "hot keys" in 
> k/v state and lazy load those "cold keys" from remote storage(e.g Cassandra).
> For now, we don't have good story regarding to data retirement. We might have 
> to keep forever until manually run command and clean per job related state 
> data. Some of features might related to incremental checkpointing feature, we 
> hope to align with effort there also.
> Welcome comments, I will try to put a draft design doc after gathering some 
> feedback.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-3947) Provide low level access to RocksDB state backend

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-3947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-3947.
---
Resolution: Information Provided

MapState and keyedStatebackend#getKeys have been implemented. Closing this 
ticket due to lack of activity.

> Provide low level access to RocksDB state backend
> -
>
> Key: FLINK-3947
> URL: https://issues.apache.org/jira/browse/FLINK-3947
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.0.3
>Reporter: Elias Levy
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> The current state API is limiting and some implementations are not as 
> efficient as they could be, particularly when working with large states. For 
> instance, a ListState is append only.  You cannot remove values from the 
> list.  And the RocksDBListState get() implementation reads all list values 
> from RocksDB instead of returning an Iterable that only reads values as 
> needed.
> Furthermore, RocksDB is an ordered KV store, yet there is no ordered map 
> state API with an ability to iterate over the stored values in order.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-4413) Improve savepoint restore error messages

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-4413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-4413.
---
Resolution: Information Provided

Closing this ticket due to lack of activity.

> Improve savepoint restore error messages
> 
>
> Key: FLINK-4413
> URL: https://issues.apache.org/jira/browse/FLINK-4413
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream, Runtime / State Backends
>Affects Versions: 1.1.0
>Reporter: Gyula Fora
>Priority: Not a Priority
>
> Currently when savepoint restore fails due to some problems with parallelism 
> or the assigned uids the error messages contain only the job vertex id of the 
> problematic task.
> This makes these kind of problems very difficult to debug for more complex 
> topologies.
> I propose to add the user assigned task names to these error messages to make 
> this much easier for users.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-4493) Unify the snapshot output format for keyed-state backends

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-4493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-4493.
---
Resolution: Information Provided

Already implemented in FLINK-20976.

> Unify the snapshot output format for keyed-state backends
> -
>
> Key: FLINK-4493
> URL: https://issues.apache.org/jira/browse/FLINK-4493
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Stefan Richter
>Priority: Not a Priority
>
> We could unify the output format for keyed-state backends implementations, 
> e.g. based on RocksDB and Heap, to write a single, common output format.
> For example, this would allow us to restore a state that was previously kept 
> in RocksDB on a heap-located backend and vice versa.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-4916) BufferSpiller should distribute files across temp directories

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-4916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-4916.
---
Resolution: Information Provided

{{BufferSpiller}} had been dropped.

> BufferSpiller should distribute files across temp directories
> -
>
> Key: FLINK-4916
> URL: https://issues.apache.org/jira/browse/FLINK-4916
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Stephan Ewen
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Currently, the {{BufferSpiller}} puts files into one temp directory.
> It should be a simple extension to allow it to rotate files across temp 
> directories.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-5151) Add discussion about object mutations to heap-based state backend docs.

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-5151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-5151:
---

Assignee: Hangxiang Yu

> Add discussion about object mutations to heap-based state backend docs.
> ---
>
> Key: FLINK-5151
> URL: https://issues.apache.org/jira/browse/FLINK-5151
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Runtime / State Backends
>Affects Versions: 1.1.4, 1.2.0
>Reporter: Fabian Hueske
>Assignee: Hangxiang Yu
>Priority: Minor
>  Labels: auto-deprioritized-major
> Fix For: 1.15.0
>
>
> Flink's heap state backends store data as objects on the heap. Any object 
> mutations are hence reflected in the state.
> This can lead to unexpected behavior. For example, in case of sliding 
> windows, multiple window hold references to the same object. Hence, all 
> windows are affected if such an object is modified, e.g., by a 
> {{WindowFunction}}, {{ReduceFunction}}, or {{FoldFunction}}, and might return 
> invalid results.
> We should add this information to the state backend documentation and also 
> point out that the RocksDB backend is not affected by this because all data 
> is serialized.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-5151) Add discussion about object mutations to heap-based state backend docs.

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-5151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-5151:

Fix Version/s: 1.15.0

> Add discussion about object mutations to heap-based state backend docs.
> ---
>
> Key: FLINK-5151
> URL: https://issues.apache.org/jira/browse/FLINK-5151
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Runtime / State Backends
>Affects Versions: 1.1.4, 1.2.0
>Reporter: Fabian Hueske
>Priority: Minor
>  Labels: auto-deprioritized-major
> Fix For: 1.15.0
>
>
> Flink's heap state backends store data as objects on the heap. Any object 
> mutations are hence reflected in the state.
> This can lead to unexpected behavior. For example, in case of sliding 
> windows, multiple window hold references to the same object. Hence, all 
> windows are affected if such an object is modified, e.g., by a 
> {{WindowFunction}}, {{ReduceFunction}}, or {{FoldFunction}}, and might return 
> invalid results.
> We should add this information to the state backend documentation and also 
> point out that the RocksDB backend is not affected by this because all data 
> is serialized.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-5373) Extend Unit Tests for StateAssignmentOperation

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-5373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-5373.
---
Resolution: Information Provided

{{StateAssignmentOperationTest}} had been added.

> Extend Unit Tests for StateAssignmentOperation
> --
>
> Key: FLINK-5373
> URL: https://issues.apache.org/jira/browse/FLINK-5373
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Aljoscha Krettek
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned
>
> The legacy savepoint restore end-to-end test uncovered a slight problem with 
> null pointers that is fixed by this commit: 
> https://github.com/apache/flink/commit/74df7631316e78af39a5416e12c1adc8a46d87fe
> We should extend unit tests to catch this case in the future.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-5374) Extend Unit Tests for RegisteredBackendStateMetaInfo

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-5374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-5374.
---
Resolution: Information Provided

{{RegisteredBackendStateMetaInfo}} had been dropped.

> Extend Unit Tests for RegisteredBackendStateMetaInfo
> 
>
> Key: FLINK-5374
> URL: https://issues.apache.org/jira/browse/FLINK-5374
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Aljoscha Krettek
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned
>
> The legacy savepoint restore end-to-end test uncovered a slight problem with 
> the compatibility check of the meta info: 
> https://github.com/apache/flink/commit/d1eaa1ee41728e6d788f1e914cb0568a874a6f32
> We should extend unit tests to catch this case in the future.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-5436) UDF state without CheckpointedRestoring can result in restarting loop

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-5436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-5436.
---
Resolution: Information Provided

The API of {{CheckpointedRestoring}} had been dropped.

> UDF state without CheckpointedRestoring can result in restarting loop
> -
>
> Key: FLINK-5436
> URL: https://issues.apache.org/jira/browse/FLINK-5436
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ufuk Celebi
>Priority: Not a Priority
>
> When restoring a job with Checkpointed state and not implementing the new 
> CheckpointedRestoring interface, the job will be restarted over and over 
> again (given the respective restarting strategy).
> Since this is not recoverable, we should immediately fail the job.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-5437) Make CheckpointedRestoring error message more detailed

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-5437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-5437.
---
Resolution: Information Provided

The API of {{CheckpointedRestoring}} has been dropped.

> Make CheckpointedRestoring error message more detailed
> --
>
> Key: FLINK-5437
> URL: https://issues.apache.org/jira/browse/FLINK-5437
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ufuk Celebi
>Priority: Not a Priority
>
> When restoring Checkpointed state without implementing CheckpointedRestoring, 
> the job fails with the following Exception:
> {code}
> java.lang.Exception: Found UDF state but operator is not instance of 
> CheckpointedRestoring
> {code}
> I think we should make this error message more detailed.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-5439) Adjust max parallelism when migrating

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-5439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-5439.
---
Resolution: Information Provided

Closing this ticket due to lack of activity and v1 savepoint have been too old.

> Adjust max parallelism when migrating
> -
>
> Key: FLINK-5439
> URL: https://issues.apache.org/jira/browse/FLINK-5439
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ufuk Celebi
>Priority: Not a Priority
>
> When migrating from v1 savepoints which don't have the notion of a max 
> parallelism, the job needs to explicitly set the max parallelism to the 
> parallelism of the savepoint.
> [~stefanrichte...@gmail.com] If this not trivially implemented, let's close 
> this as won't fix.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-5440) Misleading error message when migrating and scaling down from savepoint

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-5440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-5440.
---
Resolution: Information Provided

Closing this ticket due to lack of activity.

> Misleading error message when migrating and scaling down from savepoint
> ---
>
> Key: FLINK-5440
> URL: https://issues.apache.org/jira/browse/FLINK-5440
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ufuk Celebi
>Priority: Not a Priority
>
> When resuming from an 1.1 savepoint with 1.2 and reducing the parallelism 
> (and correctly setting the max parallelism), the error message says something 
> about a missing operator which is misleading. Restoring from the same 
> savepoint with the savepoint parallelism works as expected.
> Instead it should state that this kind of operation is not possible. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-5707) Find better keys for backend configuration parameters for state backends

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-5707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-5707.
---
Resolution: Information Provided

Closing this ticket as already done in current Flink.

> Find better keys for backend configuration parameters for state backends
> 
>
> Key: FLINK-5707
> URL: https://issues.apache.org/jira/browse/FLINK-5707
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Priority: Not a Priority
>
> Currently, some config keys for the backends are confusing or even misleading 
> and could be renamed. For example
> `state.backend.fs.checkpointdir` -> `state.backend.checkpoints.dir`
> `state.backend.rocksdb.checkpointdir` -> `state.backend.rocksdb.workdir`
> `state.checkpoints.dir`
> This would reflect their purposes much better.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-5730) Users can concurrently modify state metadata of RocksDB asynchronous snapshots

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-5730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-5730.
---
Resolution: Invalid

Current RocksDB snapshots would copy the kv state information in the sync phase 
to avoid this problem. Closing this ticket.

> Users can concurrently modify state metadata of RocksDB asynchronous snapshots
> --
>
> Key: FLINK-5730
> URL: https://issues.apache.org/jira/browse/FLINK-5730
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Stefan Richter
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> The current implementation of asynchronous snapshots in RocksDB iterates the 
> original state metadata structures as part of the asynchronous snapshot. 
> Users could potentially modify the state metadata concurrently (e.g. by 
> registering a new state while the snapshot is running), thereby corrupting 
> the metadata.
> For the way most users are registering their states (at the start of the 
> task), this is not a problem. However, if state is conditionally registered 
> at some later point in time this can be problematic.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-5761) ClassNotFoundException during cancel job

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-5761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-5761.
---
Resolution: Information Provided

Closing this ticket due to lack of activity.

> ClassNotFoundException during cancel job
> 
>
> Key: FLINK-5761
> URL: https://issues.apache.org/jira/browse/FLINK-5761
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.1.4
>Reporter: Andrey
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Steps to reproduce:
> * setup flink cluster in HA mode
> * submit job with rocksdb state backend and enableFullyAsyncSnapshots
> * send some load to the job
> * in the middle of processing cancel job using the command: ./flink cancel 
> 
> In the JobManager logs:
> {code}
> 2017-02-09 13:55:49,511 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping 
> checkpoint coordinator for job e140ad8a3deeae991a9bbe080222d3f6
> 2017-02-09 13:55:49,517 INFO  
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - 
> Removed job graph e140ad8a3deeae991a9bbe080222d3f6 from ZooKeeper.
> 2017-02-09 13:55:49,519 WARN  
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - 
> Failed to discard checkpoint 1.
> java.lang.Exception: Could not discard the completed checkpoint Checkpoint 1 
> @ 1486648542769 for e140ad8a3deeae991a9bbe080222d3f6.
> at 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore$1.processResult(ZooKeeperCompletedCheckpointStore.java:308)
> at 
> org.apache.flink.shaded.org.apache.curator.framework.imps.Backgrounding$1$1.run(Backgrounding.java:109)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend$FinalFullyAsyncSnapshot
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:65)
> at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)
> at 
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
> at java.util.HashMap.readObject(HashMap.java:1396)
> at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1714)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
> at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:291)
> at 
> org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
> at 
> org.apache.flink.runtime.checkpoint.SubtaskState.discard(SubtaskState.java:85)
> at 
> 

[jira] [Closed] (FLINK-6054) Add new state backend that dynamically stores data in memory and external storage

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-6054.
---
Resolution: Information Provided

The idea is like spillable state-backend: 
https://issues.apache.org/jira/browse/FLINK-12692
Closing this ticket due to lack of activity and design documentation.

> Add new state backend that dynamically stores data in memory and external 
> storage
> -
>
> Key: FLINK-6054
> URL: https://issues.apache.org/jira/browse/FLINK-6054
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / State Backends
>Reporter: Sergio Esteves
>Priority: Not a Priority
>
> This feature would be useful for memory-intensive applications that need to 
> maintain state for long periods of time; e.g., event-time streaming 
> application with long-lived windows that tolerate large amounts of lateness.
> This feature would allow to scale the state and, in the example above, 
> tolerate a very large (possibly unbounded) amount of lateness, which can be 
> useful in a set of scenarios, like the one of Photon in the Google 
> Advertising System (white paper: "Photon: Fault-tolerant and Scalable Joining 
> of Continuous Data Streams").
> In a nutshell, the idea would be to have a quota for the maximum memory that 
> a state cell (different keys and namespaces) can occupy. When that quota gets 
> fully occupied, new state data would be written out to disk. Then, when state 
> needs to be retrieved, data is read entirely from memory - persisted data is 
> loaded into memory in the background at the same time that data pertaining to 
> the quota is being fetched (this reduces I/O overhead).
> Different policies, defining when to offload/load data from/to memory, can be 
> implemented to govern the overall memory utilization. We already have a 
> preliminary implementation with promising results in terms of memory savings 
> (in the context of streaming applications with windows that tolerate 
> lateness).
> More details are to be given soon through a design document.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-6408) Repeated loading of configuration files in hadoop filesystem code paths

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-6408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-6408.
---
Resolution: Information Provided

Closing this ticket due to lack of activity.

> Repeated loading of configuration files in hadoop filesystem code paths
> ---
>
> Key: FLINK-6408
> URL: https://issues.apache.org/jira/browse/FLINK-6408
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends, Runtime / Task
>Affects Versions: 1.2.1
>Reporter: Stephen Gran
>Priority: Not a Priority
>
> We are running flink on mesos in AWS.  Checkpointing is enabled with an s3 
> backend, configured via the hadoop s3a filesystem implementation and done 
> every second.
> We are seeing roughly 3 million log events per hour from a relatively small 
> job, and it appears that this is because every s3 copy event reloads the 
> hadoop configuration, which in turn reloads the flink configuration.  The 
> flink configuration loader is outputting each key/value pair every time it is 
> invoked, leading to this volume of logs.
> While the logging is relatively easy to deal with - just a log4j setting - 
> the behaviour is probably suboptimal.  It seems that the configuration loader 
> could easily be changed over to a singleton pattern to prevent the constant 
> rereading of files.
> If you're interested, we can probably knock up a patch for this in a 
> relatively short time.
> Cheers,



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-6481) Encapsulate different snapshot algorithms (full, incremental) slightly clearer into strategy pattern

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-6481.
---
Resolution: Information Provided

Current Flink already has {{RocksFullSnapshotStrategy}} and 
{{RocksIncrementalSnapshotStrategy}}. Closing this ticket due to lack of 
activity.

> Encapsulate different snapshot algorithms (full, incremental) slightly 
> clearer into strategy pattern
> 
>
> Key: FLINK-6481
> URL: https://issues.apache.org/jira/browse/FLINK-6481
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Stefan Richter
>Priority: Not a Priority
>
> We could encapsulate different snapshot algorithms (full, incremental) 
> slightly clearer into strategy pattern and also try if we can separate their 
> implementing (currently inner) classes from clearly from RocksDB's remaining 
> business logic.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-19918) RocksIncrementalCheckpointRescalingTest.testScalingDown/Up fails on Windows with NPE

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-19918.

Resolution: Information Provided

Closing this ticket as lack of activity.

> RocksIncrementalCheckpointRescalingTest.testScalingDown/Up fails on Windows 
> with NPE
> 
>
> Key: FLINK-19918
> URL: https://issues.apache.org/jira/browse/FLINK-19918
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Andrey Zagrebin
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> testScalingDown:
> {code:java}
> java.lang.NullPointerExceptionjava.lang.NullPointerException at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.close(AbstractStreamOperatorTestHarness.java:656)
>  at 
> org.apache.flink.contrib.streaming.state.RocksIncrementalCheckpointRescalingTest.closeHarness(RocksIncrementalCheckpointRescalingTest.java:357)
>  at 
> org.apache.flink.contrib.streaming.state.RocksIncrementalCheckpointRescalingTest.testScalingDown(RocksIncrementalCheckpointRescalingTest.java:276)
> {code}
>  
> testScalingUp:
> {code:java}
> java.lang.IllegalArgumentException: Cannot use the root directory for 
> checkpoints.java.lang.IllegalArgumentException: Cannot use the root directory 
> for checkpoints.
>  at 
> org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend.validatePath(AbstractFileStateBackend.java:195)
>  at 
> org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend.(AbstractFileStateBackend.java:109)
>  at 
> org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend.(AbstractFileStateBackend.java:95)
>  at 
> org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBackend.java:339)
>  at 
> org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBackend.java:216)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.(RocksDBStateBackend.java:238)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.(RocksDBStateBackend.java:205)
>  at 
> org.apache.flink.contrib.streaming.state.RocksIncrementalCheckpointRescalingTest.getStateBackend(RocksIncrementalCheckpointRescalingTest.java:390)
>  at 
> org.apache.flink.contrib.streaming.state.RocksIncrementalCheckpointRescalingTest.testScalingUp(RocksIncrementalCheckpointRescalingTest.java:106)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498) at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20) at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>  at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:363) at 
> org.junit.runners.Suite.runChild(Suite.java:128) at 
> org.junit.runners.Suite.runChild(Suite.java:27) at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:363) at 
> org.junit.runner.JUnitCore.run(JUnitCore.java:137) at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
>  at 
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>  at 
> 

[jira] [Closed] (FLINK-21593) RocksDBListStatePerformanceTest.testRocksDbListStateAPI fail because of timeout

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-21593.

Resolution: Information Provided

{{RocksDBListStatePerformanceTest}} has been removed.

> RocksDBListStatePerformanceTest.testRocksDbListStateAPI fail because of 
> timeout
> ---
>
> Key: FLINK-21593
> URL: https://issues.apache.org/jira/browse/FLINK-21593
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.11.3
>Reporter: Guowei Ma
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=14080=logs=3b6ec2fd-a816-5e75-c775-06fb87cb6670=b33fdd4f-3de5-542e-2624-5d53167bb672
> {code:java}
> [ERROR] 
> testRocksDbListStateAPIs(org.apache.flink.contrib.streaming.state.benchmark.RocksDBListStatePerformanceTest)
>   Time elapsed: 8.245 s  <<< ERROR!
> org.junit.runners.model.TestTimedOutException: test timed out after 2000 
> milliseconds
>   at org.rocksdb.RocksDB.open(Native Method)
>   at org.rocksdb.RocksDB.open(RocksDB.java:231)
>   at 
> org.apache.flink.contrib.streaming.state.benchmark.RocksDBListStatePerformanceTest.testRocksDbListStateAPIs(RocksDBListStatePerformanceTest.java:96)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-6647) Fail-fast on invalid RocksDBStateBackend configuration

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-6647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-6647.
---
Resolution: Information Provided

Current {{FileSystemCheckpointStorage}} would already check whether the path 
uri is legal on initialization.

> Fail-fast on invalid RocksDBStateBackend configuration
> --
>
> Key: FLINK-6647
> URL: https://issues.apache.org/jira/browse/FLINK-6647
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Andrey
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-minor
>
> Currently:
> * setup "state.backend.rocksdb.checkpointdir=hdfs:///some/base/path/hdfs"
> * setup backend: state.backend: 
> "org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory"
> * rocksdb doesn't support hdfs backend so in logs:
> {code}
> 2017-05-19 15:42:33,737 ERROR 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Local DB files 
> directory '/some/base/path/hdfs' does not exist and cannot be created.
> {code}
> * however job continue execution and IOManager temp directory will be picked 
> up for rocksdb files.
> There are several issues with such approach:
> * after "ERROR" message printed and before developer fixes configuration, 
> /tmp directory/partition might run out of disk space.
> * if hdfs base path is the same as local path, then no errors in logs and 
> rocksdb files will be written into an incorrect location. For example: 
> "hdfs:///home/flink/data" will cause an issue.
> Expected:
> * validate URI and throw IllegalArgumentException like already implemented in 
> "RocksDBStateBackend.setDbStoragePaths" method.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-7883) Make savepoints atomic with respect to state and side effects

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-7883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-7883.
---
Resolution: Information Provided

Closing this as we already have the feature of stopping with savepoint.

> Make savepoints atomic with respect to state and side effects
> -
>
> Key: FLINK-7883
> URL: https://issues.apache.org/jira/browse/FLINK-7883
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream, Connectors / Kafka, Runtime / State 
> Backends
>Affects Versions: 1.3.2, 1.4.0
>Reporter: Antoine Philippot
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> For a cancel with savepoint command, the JobManager trigger the cancel call 
> once the savepoint is finished, but during the savepoint execution, kafka 
> source continue to poll new messages which will not be part of the savepoint 
> and will be replayed on the next application start.
> A solution could be to stop fetching the source stream task before triggering 
> the savepoint.
> I suggest to add an interface {{StoppableFetchingSourceFunction}} with a 
> method {{stopFetching}} that existant SourceFunction implementations could 
> implement.
> We can add a {{stopFetchingSource}} property in 
>  {{CheckpointOptions}} class to pass the desired behaviour from 
> {{JobManager.handleMessage(CancelJobWithSavepoint)}} to 
> {{SourceStreamTask.triggerCheckpoint}}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-8969) Move TimerService into state backend

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-8969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-8969.
---
Resolution: Information Provided

Closing this ticket due to lack of activity and details.

> Move TimerService into state backend
> 
>
> Key: FLINK-8969
> URL: https://issues.apache.org/jira/browse/FLINK-8969
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Bowen Li
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-minor
>
> upon discussion with [~aljoscha]. More details need to be added here



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-9116) Introduce getAll and removeAll for MapState

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-9116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-9116.
---
Resolution: Information Provided

There is no actual need for these two methods currently. Closing this ticket 
due to lack of activity.

> Introduce getAll and removeAll for MapState
> ---
>
> Key: FLINK-9116
> URL: https://issues.apache.org/jira/browse/FLINK-9116
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / State Backends
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned, 
> pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> We have supported {{putAll(List)}} in {{MapState}}, I think we should also 
> support {{getAll(Iterable)}} and {{removeAll(Iterable)}} in {{MapState}}, it 
> can be convenient in some scenario.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-9268) RockDB errors from WindowOperator

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-9268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-9268.
---
Resolution: Information Provided

The JNI limit has been resolved via 
https://github.com/facebook/rocksdb/pull/3850

> RockDB errors from WindowOperator
> -
>
> Key: FLINK-9268
> URL: https://issues.apache.org/jira/browse/FLINK-9268
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Runtime / State Backends
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-minor
>
> The job has no sinks, one Kafka source, does a windowing based on session and 
> uses processing time. The job fails with the error given below after running 
> for few hours. The only way to recover from this error is to cancel the job 
> and start a new one.
> Using S3 backend for externalized checkpoints.
> A representative job DAG:
> val streams = sEnv
>  .addSource(makeKafkaSource(config))
>  .map(makeEvent)
>  .keyBy(_.get(EVENT_GROUP_ID))
>  .window(ProcessingTimeSessionWindows.withGap(Time.seconds(60)))
>  .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
>  .apply(makeEventsList)
> .addSink(makeNoOpSink)
> A representative config:
> state.backend=rocksDB
> checkpoint.enabled=true
> external.checkpoint.enabled=true
> checkpoint.mode=AT_LEAST_ONCE
> checkpoint.interval=90
> checkpoint.timeout=30
> Error:
> TimerException\{java.lang.NegativeArraySizeException}
>  at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NegativeArraySizeException
>  at org.rocksdb.RocksDB.get(Native Method)
>  at org.rocksdb.RocksDB.get(RocksDB.java:810)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49)
>  at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496)
>  at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255)
>  at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-9328) RocksDBStateBackend might use PlaceholderStreamStateHandle to restore due to StateBackendTestBase class not register snapshots in some UTs

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-9328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-9328.
---
Resolution: Duplicate

Closing this ticket as already resolved by FLINK-9887.

> RocksDBStateBackend might use PlaceholderStreamStateHandle to restore due to 
> StateBackendTestBase class not register snapshots in some UTs
> --
>
> Key: FLINK-9328
> URL: https://issues.apache.org/jira/browse/FLINK-9328
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.4.2, 1.5.4
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Not a Priority
>  Labels: pull-request-available, stale-assigned
>
> Currently, StateBackendTestBase class does not register snapshots to 
> SharedStateRegistry in testValueState, testListState, testReducingState, 
> testFoldingState and testMapState UTs, which may cause RocksDBStateBackend to 
> restore from PlaceholderStreamStateHandle during the 2nd restore procedure if 
> one specific sst file both existed in the 1st snapshot and the 2nd snapshot 
> handle.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-9831) Too many open files for RocksDB

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-9831?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-9831.
---
Resolution: Information Provided

Closing this due to lack of activity.


> Too many open files for RocksDB
> ---
>
> Key: FLINK-9831
> URL: https://issues.apache.org/jira/browse/FLINK-9831
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.5.0
>Reporter: Sayat Satybaldiyev
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
> Attachments: flink_open_files.txt
>
>
> While running only one Flink job, which is backed by RocksDB with 
> checkpoining to HDFS we encounter an exception that TM cannot access the SST 
> file because the process has too many open files. However, we have already 
> increased the file soft/hard limit on the machine.
> Number open files for TM on the machine:
>  
> {code:java}
> lsof -p 23301|wc -l
> 8241{code}
>  
> Instance limits
>  
> {code:java}
> ulimit -a
> core file size (blocks, -c) 0
> data seg size (kbytes, -d) unlimited
> scheduling priority (-e) 0
> file size (blocks, -f) unlimited
> pending signals (-i) 256726
> max locked memory (kbytes, -l) 64
> max memory size (kbytes, -m) unlimited
> open files (-n) 1048576
> pipe size (512 bytes, -p) 8
> POSIX message queues (bytes, -q) 819200
> real-time priority (-r) 0
> stack size (kbytes, -s) 8192
> cpu time (seconds, -t) unlimited
> max user processes (-u) 128000
> virtual memory (kbytes, -v) unlimited
> file locks (-x) unlimited
>  
> {code}
>  
> [^flink_open_files.txt]
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for 
> KeyedCoProcessOperator_98a16ed3228ec4a08acd8d78420516a1_(1/1) from any of the 
> 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:276)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:132)
>   ... 5 more
> Caused by: java.io.FileNotFoundException: 
> /tmp/flink-io-3da06c9e-f619-44c9-b95f-54ee9b1a084a/job_b3ecbdc0eb2dc2dfbf5532ec1fcef9da_op_KeyedCoProcessOperator_98a16ed3228ec4a08acd8d78420516a1__1_1__uuid_c4b82a7e-8a04-4704-9e0b-393c3243cef2/3701639a-bacd-4861-99d8-5f3d112e88d6/16.sst
>  (Too many open files)
>   at java.io.FileOutputStream.open0(Native Method)
>   at java.io.FileOutputStream.open(FileOutputStream.java:270)
>   at java.io.FileOutputStream.(FileOutputStream.java:213)
>   at java.io.FileOutputStream.(FileOutputStream.java:162)
>   at 
> org.apache.flink.core.fs.local.LocalDataOutputStream.(LocalDataOutputStream.java:47)
>   at 
> org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:275)
>   at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:121)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.copyStateDataHandleData(RocksDBKeyedStateBackend.java:1008)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllDataFromStateHandles(RocksDBKeyedStateBackend.java:988)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllStateDataToDirectory(RocksDBKeyedStateBackend.java:973)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:758)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:732)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:443)
>   at 
> 

[jira] [Closed] (FLINK-9945) RocksDB state backend Checkpointing Failed

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-9945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-9945.
---
Resolution: Information Provided

Closing this as lack of activity.

> RocksDB state backend Checkpointing Failed
> --
>
> Key: FLINK-9945
> URL: https://issues.apache.org/jira/browse/FLINK-9945
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.4.2
>Reporter: xymaqingxiang
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
> Attachments: image-2018-07-25-16-57-45-617.png, 
> image-2018-07-26-18-31-21-429.png
>
>
> Checkpoint failed.
> The log is:
> !image-2018-07-25-16-57-45-617.png!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-10040) Decompose RocksDBKeyedStateBackend

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-10040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-10040.

Resolution: Resolved

All sub-tasks have been resolved.

> Decompose RocksDBKeyedStateBackend
> --
>
> Key: FLINK-10040
> URL: https://issues.apache.org/jira/browse/FLINK-10040
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Stefan Richter
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned
>
> Over time, the class {{RocksDBKeyedStateBackend}} has grown to around 3000 
> LOC, with a lot of inner classes (different checkpoint/restore strategies, 
> iterators, etc).
> I suggest to decompose it and improve how dependencies to the classes are 
> provided in construction, so that things become better to maintain, extend, 
> and test.
> Will break down this work into multiple tasks, ordered by estimated 
> difficulty/time required.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-10297) PostVersionedIOReadableWritable ignores result of InputStream.read(...)

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-10297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-10297.

Resolution: Information Provided

Closing this as already resolved by FLINK-19300.

> PostVersionedIOReadableWritable ignores result of InputStream.read(...)
> ---
>
> Key: FLINK-10297
> URL: https://issues.apache.org/jira/browse/FLINK-10297
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.4.2, 1.5.3, 1.6.0
>Reporter: Stefan Richter
>Priority: Minor
>  Labels: auto-deprioritized-critical, auto-deprioritized-major
>
> PostVersionedIOReadableWritable ignores result of {{InputStream.read(...)}}. 
> Probably the intention was to invoke {{readFully}}. As it is now, this can 
> lead to a corrupted deserialization.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-10372) There is no API to configure the timer state backend

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-10372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-10372.

Resolution: Duplicate

> There is no API to configure the timer state backend
> 
>
> Key: FLINK-10372
> URL: https://issues.apache.org/jira/browse/FLINK-10372
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream, Runtime / State Backends
>Affects Versions: 1.6.0
>Reporter: Elias Levy
>Priority: Not a Priority
>
> Flink 1.6.0, via FLINK-9485, introduced the option to store timers in RocksDB 
> instead of the heap.  Alas, this can only be configured via the 
> {{state.backend.rocksdb.timer-service.factory}} config file option.  That 
> means that the choice of state backend to use for timer can't be made on a 
> per job basis on a shared cluster.
> There is a need for an API in {{RocksDBStateBackend}} to configure the 
> backend per job.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-10387) StateBackend create methods should return interface not abstract KeyedStateBackend classes

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-10387?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-10387.

Resolution: Information Provided

Current method would return {{CheckpointableKeyedStateBackend}} directly.

> StateBackend create methods should return interface not abstract 
> KeyedStateBackend classes
> --
>
> Key: FLINK-10387
> URL: https://issues.apache.org/jira/browse/FLINK-10387
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.6.0
>Reporter: Gyula Fora
>Priority: Not a Priority
>
> Currently the createKeyedStateBackend(...) methods return 
> AbstractKeyedStateBackend instead of an interface.
> This makes it virtually impossible to write nice extensions to StateBackends 
> that add additional functionality to existing backends while delegating other 
> method calls.
> It should be easy enough to add a new interface that extends everything that 
> the AbstractKeyedStateBackend does and the method should return that.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-10950) RocksDB backend does not work on Windows due to path issue

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-10950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-10950.

Resolution: Duplicate

> RocksDB backend does not work on Windows due to path issue
> --
>
> Key: FLINK-10950
> URL: https://issues.apache.org/jira/browse/FLINK-10950
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.6.2
>Reporter: Adam Laczynski
>Priority: Not a Priority
>
> For -Djava.io.tmpdir=C:/some/path/tmp
> Getting following error when running job from IDE (local env):
> Caused by: org.rocksdb.RocksDBException: Failed to create dir: 
> /C:/some/path/tmp/flink-io-2a68cdeb-8474-487d-8e61-393dde20a2af/job_e0cc28012d2aca77290bb3880c452f66_op_WindowOperator_700a073d2ada22dc548debf1e8f75ec2__1_1__uuid_de037d5c-4a38-4c71-8133-9b208ce54b23/chk-1.tmp:
>  Invalid argument
>  at org.rocksdb.Checkpoint.createCheckpoint(Native Method)
>  at org.rocksdb.Checkpoint.createCheckpoint(Checkpoint.java:51)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.takeSnapshot(RocksDBKeyedStateBackend.java:2549)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$IncrementalSnapshotStrategy.performSnapshot(RocksDBKeyedStateBackend.java:2008)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:498)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:383)
>  ... 13 more



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-10949) When use flink-1.6.2's intervalJoin funtion, the thread is stucked in rockdb's seek for too long time

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-10949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-10949.

Resolution: Information Provided

Closing this ticket due to lack of activity.

> When use flink-1.6.2's intervalJoin funtion, the thread is stucked in 
> rockdb's seek for too long time
> -
>
> Key: FLINK-10949
> URL: https://issues.apache.org/jira/browse/FLINK-10949
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.6.2
> Environment: flink1.6.2, linux
>Reporter: Liu
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> features, performance
>
> 0down vote 
> [favorite|https://stackoverflow.com/questions/53393775/when-use-flink-1-6-2s-intervaljoin-funtion-the-thread-is-stucked-in-seek-for-t]
>   
>  I am using IntervalJoin function to join two streams within 10 minutes. As 
> below:
>  
>  {{labelStream.intervalJoin(adLogStream)}}
>                          .between(Time.milliseconds(0), 
> Time.milliseconds(60)) 
>  {{           .process(new processFunction())}}
>  {{           .sink(kafkaProducer)}}
>  labelStream and adLogStream are proto-buf class that are keyed by Long id.
> Our two input-streams are huge. After running about 30minutes, the output to 
> kafka go down slowly, like this: 
> !https://i.stack.imgur.com/UW4V1.png!
> When data output begins going down, I use jstack and pstack sevaral times to 
> get these: 
> !https://i.stack.imgur.com/uxOZn.png!
> !https://i.stack.imgur.com/JTyIC.png!
> It seems the program is stucked in rockdb's seek. And I find that some 
> rockdb's srt file are accessed slowly by iteration. 
> [!https://i.stack.imgur.com/Avdyo.png!|https://i.stack.imgur.com/Avdyo.png]
> I have tried several ways:
> {{1)Reduce the input amount to half. This works well.}}
> 2)Replace labelStream and adLogStream with simple Strings. This way, data 
> amount will not change. This works well.}}
> 3)Use PredefinedOptions like SPINNING_DISK_OPTIMIZED and 
> SPINNING_DISK_OPTIMIZED_HIGH_MEM. This still fails.}}
> 4)Use new versions of rocksdbjni. This still fails.}}
> {{Can anyone give me some suggestions? Thank you very much.}}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-14197) Increasing trend for state size of keyed stream using ProcessWindowFunction with ProcessingTimeSessionWindows

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-14197.

Resolution: Information Provided

> Increasing trend for state size of keyed stream using ProcessWindowFunction 
> with ProcessingTimeSessionWindows
> -
>
> Key: FLINK-14197
> URL: https://issues.apache.org/jira/browse/FLINK-14197
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.9.0
> Environment: Tested with:
>  * Local Flink Mini Cluster running from IDE
>  * Flink standalone cluster run in docker
>Reporter: Oliver Kostera
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> I'm using *ProcessWindowFunction* in a keyed stream with the following 
> definition:
> {code:java}
> final SingleOutputStreamOperator processWindowFunctionStream 
> =
> 
> keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(100)))
> .process(new 
> CustomProcessWindowFunction()).uid(PROCESS_WINDOW_FUNCTION_OPERATOR_ID)
> .name("Process window function");
> {code}
> My checkpointing configuration is set to use RocksDB state backend with 
> incremental checkpointing and EXACTLY_ONCE mode.
> In a runtime I noticed that even though data ingestion is static - same keys 
> and frequency of messages the size of the process window operator keeps 
> increasing. I tried to reproduce it with minimal similar setup here: 
> https://github.com/loliver1234/flink-process-window-function and was 
> successful to do so.
> Testing conditions:
> - RabbitMQ source with Exactly-once guarantee and 65k prefetch count
> - RabbitMQ sink to collect messages
> - Simple ProcessWindowFunction that only pass messages through
> - Stream time characteristic set to TimeCharacteristic.ProcessingTime
> Testing scenario:
> - Start flink job and check initial state size - State Size: 127 KB
> - Start sending messages, 1000 same unique keys every 1s (they are not 
> falling into defined time window gap set to 100ms, each message should create 
> new window)
> - State of the process window operator keeps increasing - after 1mln messages 
> state ended up to be around 2mb
> - Stop sending messages and wait till rabbit queue is fully consumed and few 
> checkpoints go by
> - Was expected to see state size to decrease to base value but it stayed at 
> 2mb
> - Continue to send messages with the same keys and state kept increasing 
> trend.
> What I checked:
> - Registration and deregistration of timestamps set for time windows - each 
> registration matched its deregistration
> - Checked that in fact there are no window merges
> - Tried custom Trigger disabling window merges and setting onProcessingTime 
> trigger to TriggerResult.FIRE_AND_PURGE - same state behavior
> On staging environment, we noticed that state for that operator keeps 
> increasing indefinitely, after some months reaching even 1,5gb for 100k 
> unique keys
> Flink commit id: 9c32ed9
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-14197) Increasing trend for state size of keyed stream using ProcessWindowFunction with ProcessingTimeSessionWindows

2021-12-08 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17455040#comment-17455040
 ] 

Yun Tang commented on FLINK-14197:
--

[~loliver] Do you still meet the problem?

If the state size is still under 100MB, I think this behavior is by design for 
RocksDB state-backend. This is because RocksDB leverage level compaction to 
delete old outdated data, which means if the state size cannot reach the 
level-size threshold (level-0 is 4 files, level-1 is 256MB and level-2 is 
2560MB by default), you cannot see the state size decreasing.

> Increasing trend for state size of keyed stream using ProcessWindowFunction 
> with ProcessingTimeSessionWindows
> -
>
> Key: FLINK-14197
> URL: https://issues.apache.org/jira/browse/FLINK-14197
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.9.0
> Environment: Tested with:
>  * Local Flink Mini Cluster running from IDE
>  * Flink standalone cluster run in docker
>Reporter: Oliver Kostera
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> I'm using *ProcessWindowFunction* in a keyed stream with the following 
> definition:
> {code:java}
> final SingleOutputStreamOperator processWindowFunctionStream 
> =
> 
> keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(100)))
> .process(new 
> CustomProcessWindowFunction()).uid(PROCESS_WINDOW_FUNCTION_OPERATOR_ID)
> .name("Process window function");
> {code}
> My checkpointing configuration is set to use RocksDB state backend with 
> incremental checkpointing and EXACTLY_ONCE mode.
> In a runtime I noticed that even though data ingestion is static - same keys 
> and frequency of messages the size of the process window operator keeps 
> increasing. I tried to reproduce it with minimal similar setup here: 
> https://github.com/loliver1234/flink-process-window-function and was 
> successful to do so.
> Testing conditions:
> - RabbitMQ source with Exactly-once guarantee and 65k prefetch count
> - RabbitMQ sink to collect messages
> - Simple ProcessWindowFunction that only pass messages through
> - Stream time characteristic set to TimeCharacteristic.ProcessingTime
> Testing scenario:
> - Start flink job and check initial state size - State Size: 127 KB
> - Start sending messages, 1000 same unique keys every 1s (they are not 
> falling into defined time window gap set to 100ms, each message should create 
> new window)
> - State of the process window operator keeps increasing - after 1mln messages 
> state ended up to be around 2mb
> - Stop sending messages and wait till rabbit queue is fully consumed and few 
> checkpoints go by
> - Was expected to see state size to decrease to base value but it stayed at 
> 2mb
> - Continue to send messages with the same keys and state kept increasing 
> trend.
> What I checked:
> - Registration and deregistration of timestamps set for time windows - each 
> registration matched its deregistration
> - Checked that in fact there are no window merges
> - Tried custom Trigger disabling window merges and setting onProcessingTime 
> trigger to TriggerResult.FIRE_AND_PURGE - same state behavior
> On staging environment, we noticed that state for that operator keeps 
> increasing indefinitely, after some months reaching even 1,5gb for 100k 
> unique keys
> Flink commit id: 9c32ed9
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-14379) Supplement documentation about raw state

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-14379.

Fix Version/s: (was: 1.15.0)
   Resolution: Information Provided

Current latest documentation has removed the description of raw state. Since we 
decided to not make users aware of this concept, there is no need to add a 
detailed documentation.

> Supplement documentation about raw state
> 
>
> Key: FLINK-14379
> URL: https://issues.apache.org/jira/browse/FLINK-14379
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Runtime / State Backends
>Reporter: Yun Tang
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned
>
> Currently, we only have very simple 
> [documentation|https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#raw-and-managed-state]
>  or even we could say only one sentence to talk about raw state. It might 
> lead beginner of Flink feel not so clear about this concept, I think we 
> should supplement documentation about raw state.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-14833) Remove hierachy of SnapshotStrategySynchronicityBehavior in HeapSnapshotStrategy

2021-12-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-14833.

Resolution: Information Provided

Closing this ticket as related code have been refactored.

>  Remove hierachy of SnapshotStrategySynchronicityBehavior in 
> HeapSnapshotStrategy
> -
>
> Key: FLINK-14833
> URL: https://issues.apache.org/jira/browse/FLINK-14833
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.9.1
>Reporter: Jiayi Liao
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Since all methods implementing from {{SnapshotStrategySynchronicityBehavior}} 
> in {{HeapSnapshotStrategy}} are executing as the same pattern below: 
> {code:java}
> @Override
> public void finalizeSnapshotBeforeReturnHook(Runnable runnable) {
>
> snapshotStrategySynchronicityTrait.finalizeSnapshotBeforeReturnHook(runnable);
> }
> @Override
> public boolean isAsynchronous() {
>return snapshotStrategySynchronicityTrait.isAsynchronous();
> }
> @Override
> public  StateTable newStateTable(
>InternalKeyContext keyContext,
>RegisteredKeyValueStateBackendMetaInfo newMetaInfo,
>TypeSerializer keySerializer) {
>return snapshotStrategySynchronicityTrait.newStateTable(keyContext, 
> newMetaInfo, keySerializer);
> }
> {code}
> It looks like implementing the {{SnapshotStrategySynchronicityBehavior}} 
> interface is not necessary for {{HeapSnapshotStrategy}} and we can just 
> remove it and the related {{@Override}} annotations. And 
> {{HeapSnapshotStrategy}} doesn't match the java docs in 
> {{SnapshotStrategySynchronicityBehavior}} also.
>  
> And please correct me if there is a reason here.
>  
> cc [~liyu]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-18165) When savingpoint is restored, select the checkpoint directory and stateBackend

2021-12-07 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-18165.

Resolution: Fixed

This ticket has been resolved by FLINK-20976

> When savingpoint is restored, select the checkpoint directory and stateBackend
> --
>
> Key: FLINK-18165
> URL: https://issues.apache.org/jira/browse/FLINK-18165
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.9.0
> Environment: flink 1.9
>Reporter: Xinyuan Liu
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> If the checkpoint file is used as the initial state of the savepoint startup, 
> it must be ensured that the state backend used before and after is the same 
> type, but in actual production, there will be more and more state, the 
> taskmanager memory is insufficient and the cluster cannot be expanded, and 
> the state backend needs to be switched at this time. And there is a need to 
> ensure data consistency. Unfortunately, currently flink does not provide an 
> elegant way to switch state backend, can the community consider this proposal



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-19008) Flink Job runs slow after restore + downscale from an incremental checkpoint (rocksdb)

2021-12-07 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-19008.

Resolution: Information Provided

Flink has bumpped RocksDB version to 6.20.3 which adopts this compaction 
priority as default. However, I don't think this change could benefit a lot for 
performance beavior. Let's see problem still existed after Flink-1.14.

> Flink Job runs slow after restore + downscale from an incremental checkpoint 
> (rocksdb)
> --
>
> Key: FLINK-19008
> URL: https://issues.apache.org/jira/browse/FLINK-19008
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Jun Qin
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> perfomance, usability
>
> A customer runs a Flink job with RocksDB state backend. Checkpoints are 
> retained and done incrementally. The state size is several TB. When they 
> restore + downscale from a retained checkpoint, although the downloading of 
> checkpoint files took ~20min, the job throughput returns to the expected 
> level only after 3 hours.  
> I do not have RocksDB logs. The suspicion for those 3 hours is due to heavy 
> RocksDB compaction and/or flush. As it was observed that checkpoint could not 
> finish faster enough due to long {{checkpoint duration (sync)}}. How can we 
> make this restoring phase shorter? 
> For compaction, I think it is worth to check the improvement of:
> {code:c}
> CompactionPri compaction_pri = kMinOverlappingRatio;{code}
> which has been set to default in RocksDB 6.x:
> {code:c}
> // In Level-based compaction, it Determines which file from a level to be
> // picked to merge to the next level. We suggest people try
> // kMinOverlappingRatio first when you tune your database.
> enum CompactionPri : char {
>   // Slightly prioritize larger files by size compensated by #deletes
>   kByCompensatedSize = 0x0,
>   // First compact files whose data's latest update time is oldest.
>   // Try this if you only update some hot keys in small ranges.
>   kOldestLargestSeqFirst = 0x1,
>   // First compact files whose range hasn't been compacted to the next level
>   // for the longest. If your updates are random across the key space,
>   // write amplification is slightly better with this option.
>   kOldestSmallestSeqFirst = 0x2,
>   // First compact files whose ratio between overlapping size in next level
>   // and its size is the smallest. It in many cases can optimize write
>   // amplification.
>   kMinOverlappingRatio = 0x3,
> };
> ...
> // Default: kMinOverlappingRatio  
> CompactionPri compaction_pri = kMinOverlappingRatio;{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-20996) Using a cryptographically weak Pseudo Random Number Generator (PRNG)

2021-12-07 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17455009#comment-17455009
 ] 

Yun Tang commented on FLINK-20996:
--

[~yaxiao] AbstractTtlStateVerifier is just a class for unit test instead of 
running in production environment, and I don't have a idea why this would be 
attacked in a security context.

> Using a cryptographically weak Pseudo Random Number Generator (PRNG)
> 
>
> Key: FLINK-20996
> URL: https://issues.apache.org/jira/browse/FLINK-20996
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ya Xiao
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> We are a security research team at Virginia Tech. We are doing an empirical 
> study about the usefulness of the existing security vulnerability detection 
> tools. The following is a reported vulnerability by certain tools. We'll so 
> appreciate it if you can give any feedback on it.
> *Vulnerability Description:*
> {color:#172b4d}In file 
> {color}[flink/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java,|https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java]
>  use java.util.Random instead of java.security.SecureRandom at Line 39.
> *Security Impact:*
> Java.util.Random is not cryptographically strong and may expose sensitive 
> information to certain types of attacks when used in a security context.
> *Useful Resources*:
> [https://cwe.mitre.org/data/definitions/338.html]
> *Solution we suggest:*
> Replace it with SecureRandom
> *Please share with us your opinions/comments if there is any:*
> Is the bug report helpful?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-21726) Fix checkpoint stuck

2021-12-07 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-21726.

Fix Version/s: (was: 1.15.0)
   (was: 1.14.1)
   Resolution: Information Provided

Since we already bumpped up the rocksdb version, the fix of this problem has 
been included in that version.

> Fix checkpoint stuck
> 
>
> Key: FLINK-21726
> URL: https://issues.apache.org/jira/browse/FLINK-21726
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.11.3, 1.12.2, 1.13.0
>Reporter: future
>Priority: Minor
>  Labels: auto-deprioritized-critical, auto-deprioritized-major
>
> h1. 1. Bug description:
> When RocksDB Checkpoint, it may be stuck in 
> `WaitUntilFlushWouldNotStallWrites` method.
> h1. 2. Simple analysis of the reasons:
> h2. 2.1 Configuration parameters:
>  
> {code:java}
> # Flink yaml:
> state.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED_HIGH_MEM
> state.backend.rocksdb.compaction.style: UNIVERSAL
> # corresponding RocksDB config
> Compaction Style : Universal 
> max_write_buffer_number : 4
> min_write_buffer_number_to_merge : 3{code}
> Checkpoint is usually very fast. When the Checkpoint is executed, 
> `WaitUntilFlushWouldNotStallWrites` is called. If there are 2 Immutable 
> MemTables, which are less than `min_write_buffer_number_to_merge`, they will 
> not be flushed. But will enter this code.
>  
> {code:java}
> // method: GetWriteStallConditionAndCause
> if (mutable_cf_options.max_write_buffer_number> 3 &&
>   num_unflushed_memtables >=
>   mutable_cf_options.max_write_buffer_number-1) {
>  return {WriteStallCondition::kDelayed, WriteStallCause::kMemtableLimit};
> }
> {code}
> code link: 
> [https://github.com/facebook/rocksdb/blob/fbed72f03c3d9e4fdca3e5993587ef2559ba6ab9/db/column_family.cc#L847]
> Checkpoint thought there was a FlushJob, but it didn't. So will always wait.
> h2. 2.2 solution:
> Increase the restriction: the `number of Immutable MemTable` >= 
> `min_write_buffer_number_to_merge will wait`.
> The rocksdb community has fixed this bug, link: 
> [https://github.com/facebook/rocksdb/pull/7921]
> h2. 2.3 Code that can reproduce the bug:
> [https://github.com/1996fanrui/fanrui-learning/blob/flink-1.12/module-java/src/main/java/com/dream/rocksdb/RocksDBCheckpointStuck.java]
> h1. 3. Interesting point
> This bug will be triggered only when `the number of sorted runs >= 
> level0_file_num_compaction_trigger`.
> Because there is a break in WaitUntilFlushWouldNotStallWrites.
> {code:java}
> if (cfd->imm()->NumNotFlushed() <
> cfd->ioptions()->min_write_buffer_number_to_merge &&
> vstorage->l0_delay_trigger_count() <
> mutable_cf_options.level0_file_num_compaction_trigger) {
>   break;
> }
> {code}
> code link: 
> [https://github.com/facebook/rocksdb/blob/fbed72f03c3d9e4fdca3e5993587ef2559ba6ab9/db/db_impl/db_impl_compaction_flush.cc#L1974]
> Universal may have `l0_delay_trigger_count() >= 
> level0_file_num_compaction_trigger`, so this bug is triggered.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-22962) Key group is not in KeyGroupRange error while checkpointing

2021-12-07 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17454991#comment-17454991
 ] 

Yun Tang commented on FLINK-22962:
--

[~prateekkohli2112] After flink-1.13, we unify the format of savepoint of 
RocksDB and heap keyed state backends. Will you still face the problem?



> Key group is not in KeyGroupRange error while checkpointing
> ---
>
> Key: FLINK-22962
> URL: https://issues.apache.org/jira/browse/FLINK-22962
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.12.1
> Environment: Linux
>Reporter: Prateek Kohli
>Priority: Major
>
> Hi,
>  
> We are getting the below exception while using rocksdb as state backend at 
> the time of checkpointing:
> 2021-06-10 12:05:13,933 INFO 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - 
> Aggregator (3/4)#0 - asynchronous part of checkpoint 2 could not be completed.
> java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: 
> Key group 0 is not in KeyGroupRange\{startKeyGroup=5, endKeyGroup=7}.
>  at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_261]
>  at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_261]
>  at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:621)
>  ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>  at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54)
>  ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>  at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:122)
>  [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_261]
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_261]
>  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_261]
> Caused by: java.lang.IllegalArgumentException: Key group 0 is not in 
> KeyGroupRange\{startKeyGroup=5, endKeyGroup=7}.
>  at 
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:144)
>  ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>  at 
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:106)
>  ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>  at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:333)
>  ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>  at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:264)
>  ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>  at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:227)
>  ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>  at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:180)
>  ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>  at 
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78)
>  ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_261]
>  at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:618)
>  ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>  ... 5 more
>  
> When we change the state backend to file or heap we do not get this error.
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-25195) Use duplicating API for shared artefacts in RocksDB snapshots

2021-12-06 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-25195:


Assignee: Yun Tang

> Use duplicating API for shared artefacts in RocksDB snapshots
> -
>
> Key: FLINK-25195
> URL: https://issues.apache.org/jira/browse/FLINK-25195
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Dawid Wysakowicz
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.15.0
>
>
> Instead of uploading all artefacts, we could use the duplicating API to 
> cheaply create an independent copy of shared artefacts instead of uploading 
> them again (as described in FLINK-25192)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-24479) Bug fix RocksIteratorWrapper#seekToLast method

2021-12-06 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-24479.

Resolution: Information Provided

> Bug fix RocksIteratorWrapper#seekToLast method
> --
>
> Key: FLINK-24479
> URL: https://issues.apache.org/jira/browse/FLINK-24479
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.14.0
>Reporter: xiangqiao
>Priority: Major
>  Labels: pull-request-available
>
> RocksIteratorWrapper#seekToLast method, calling RocksIterator#seekToFirst 
> method incorrectly.
> The RocksIteratorWrapper#seekToLast method is not currently used, so it has 
> no impact on the flink project.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24479) Bug fix RocksIteratorWrapper#seekToLast method

2021-12-06 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17454001#comment-17454001
 ] 

Yun Tang commented on FLINK-24479:
--

Sorry for ignoring this ticket, this bug had been resolved in FLINK-24432. 
Still thanks for your enthusiasm to contribute to Flink project. You can ping 
Flink committer focus on related module directly in the comments for the next 
time to avoid ticket been ignored.

> Bug fix RocksIteratorWrapper#seekToLast method
> --
>
> Key: FLINK-24479
> URL: https://issues.apache.org/jira/browse/FLINK-24479
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.14.0
>Reporter: xiangqiao
>Priority: Major
>  Labels: pull-request-available
>
> RocksIteratorWrapper#seekToLast method, calling RocksIterator#seekToFirst 
> method incorrectly.
> The RocksIteratorWrapper#seekToLast method is not currently used, so it has 
> no impact on the flink project.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (FLINK-25163) Add more options for rocksdb state backend to make configuration more flexible

2021-12-06 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-25163.
--
Fix Version/s: 1.15.0
   Resolution: Fixed

Merged in master: ab50693c2d67dd8259cdabd641732575319592f7

> Add more options for rocksdb state backend to make configuration more flexible
> --
>
> Key: FLINK-25163
> URL: https://issues.apache.org/jira/browse/FLINK-25163
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: 刘方奇
>Assignee: 刘方奇
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Now flink has less options than the configurations what Rocksdb can set. We 
> can see many function in the org.rocksdb.DBOptions that can influence its 
> behavior(e.g rocksdb background threads).
> It make us do less when we want to do some thing to tuning Rocksdb. In my 
> opinion, there are at least there options:
>  # maxBackgroundFlushes, it can define the background flush threads. default 
> 1.
>  # maxBackgroundCompactions, it can define the background compaction threads. 
> default 1.
>  # maxBackgroundJobs, it can define the background threads. default 2.
> setIncreaseParallelism (the most we can do for the rocksdb backend background 
> threads) seems like can do little. It can't change the flush threads. I think 
> it's necessary to make rocksdb configuration flexible.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (FLINK-24777) Processed (persisted) in-flight data description miss on Monitoring Checkpointing page

2021-12-06 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-24777.
--
Fix Version/s: 1.15.0
   1.14.1
   Resolution: Fixed

Merged
master: 5f1f480978811f3dcf2b0f9ea118a5d9068db5fb
release-1.14: ef0e17ad6319175ce0054fc3c4db14b78e690dd6

> Processed (persisted) in-flight data description miss on Monitoring 
> Checkpointing page
> --
>
> Key: FLINK-24777
> URL: https://issues.apache.org/jira/browse/FLINK-24777
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.14.0
>Reporter: camilesing
>Assignee: camilesing
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.1
>
> Attachments: image-2021-11-05-00-10-08-081.png
>
>
> !image-2021-11-05-00-10-08-081.png!
> Processed (persisted) in-flight data description is missed, we need to merge 
> from Processed (persisted) in-flight data and Persisted) in-flight data 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (FLINK-21027) Add isStateKeyValueSerialized() method to KeyedStateBackend interface

2021-12-06 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-21027.
--
Resolution: Fixed

Merged in master:
bd4b0aa2ace713ca81ab3aa1ffe3bd86822d365c

> Add isStateKeyValueSerialized() method to KeyedStateBackend interface
> -
>
> Key: FLINK-21027
> URL: https://issues.apache.org/jira/browse/FLINK-21027
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Jark Wu
>Assignee: Yun Tang
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned, 
> pull-request-available
> Fix For: 1.15.0
>
>
> In Table/SQL operators, we have some optimizations that reuse objects of keys 
> and records. For example, we buffer input records in {{BytesMultiMap}} and 
> use the reused object to map to the underlying memory segment to reduce bytes 
> copy. 
> However, if we put the reused key and value into Heap statebackend, the 
> result will be wrong, because it is not allowed to mutate keys and values in 
> Heap statebackend. 
> Therefore, it would be great if {{KeyedStateBackend}} can expose such API, so 
> that Table/SQL can dynamically decide whether to copy the keys and values 
> before putting into state. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-23493) python tests hang on Azure

2021-12-05 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17453794#comment-17453794
 ] 

Yun Tang commented on FLINK-23493:
--

Another instance 
https://myasuka.visualstudio.com/flink/_build/results?buildId=358=logs=fba17979-6d2e-591d-72f1-97cf42797c11=727942b6-6137-54f7-1ef9-e66e706ea068

> python tests hang on Azure
> --
>
> Key: FLINK-23493
> URL: https://issues.apache.org/jira/browse/FLINK-23493
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.14.0, 1.13.1, 1.12.4, 1.15.0
>Reporter: Dawid Wysakowicz
>Assignee: Huang Xingbo
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.15.0, 1.14.1
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=20898=logs=821b528f-1eed-5598-a3b4-7f748b13f261=4fad9527-b9a5-5015-1b70-8356e5c91490=22829



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25163) Add more options for rocksdb state backend to make configuration more flexible

2021-12-05 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17453775#comment-17453775
 ] 

Yun Tang commented on FLINK-25163:
--

[~liufangqi] already assigned to you, please go ahead.

> Add more options for rocksdb state backend to make configuration more flexible
> --
>
> Key: FLINK-25163
> URL: https://issues.apache.org/jira/browse/FLINK-25163
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: 刘方奇
>Assignee: 刘方奇
>Priority: Major
>
> Now flink has less options than the configurations what Rocksdb can set. We 
> can see many function in the org.rocksdb.DBOptions that can influence its 
> behavior(e.g rocksdb background threads).
> It make us do less when we want to do some thing to tuning Rocksdb. In my 
> opinion, there are at least there options:
>  # maxBackgroundFlushes, it can define the background flush threads. default 
> 1.
>  # maxBackgroundCompactions, it can define the background compaction threads. 
> default 1.
>  # maxBackgroundJobs, it can define the background threads. default 2.
> setIncreaseParallelism (the most we can do for the rocksdb backend background 
> threads) seems like can do little. It can't change the flush threads. I think 
> it's necessary to make rocksdb configuration flexible.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-25163) Add more options for rocksdb state backend to make configuration more flexible

2021-12-05 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-25163:


Assignee: 刘方奇

> Add more options for rocksdb state backend to make configuration more flexible
> --
>
> Key: FLINK-25163
> URL: https://issues.apache.org/jira/browse/FLINK-25163
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: 刘方奇
>Assignee: 刘方奇
>Priority: Major
>
> Now flink has less options than the configurations what Rocksdb can set. We 
> can see many function in the org.rocksdb.DBOptions that can influence its 
> behavior(e.g rocksdb background threads).
> It make us do less when we want to do some thing to tuning Rocksdb. In my 
> opinion, there are at least there options:
>  # maxBackgroundFlushes, it can define the background flush threads. default 
> 1.
>  # maxBackgroundCompactions, it can define the background compaction threads. 
> default 1.
>  # maxBackgroundJobs, it can define the background threads. default 2.
> setIncreaseParallelism (the most we can do for the rocksdb backend background 
> threads) seems like can do little. It can't change the flush threads. I think 
> it's necessary to make rocksdb configuration flexible.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-25163) Add more options for rocksdb state backend to make configuration more flexible

2021-12-05 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17453766#comment-17453766
 ] 

Yun Tang edited comment on FLINK-25163 at 12/6/21, 3:34 AM:


The methods of {{setMaxBackgroundFlushes}} and {{setMaxBackgroundCompactions}} 
have been tagged as deprecated to use {{setMaxBackgroundJobs}} instead (see 
https://github.com/facebook/rocksdb/pull/2908). we could drop the usage of 
{{setIncreaseParallelism}} in Flink to use {{setMaxBackgroundJobs}} instead.

[~liufangqi] WDYT?


was (Author: yunta):
The methods of {{setMaxBackgroundFlushes}} and {{setMaxBackgroundCompactions}} 
have been tagged as deprecated to use {{setMaxBackgroundJobs}} instead (see 
https://github.com/facebook/rocksdb/pull/2908). we could drop the usage of 
{{setIncreaseParallelism}} in Flink to use {{setMaxBackgroundJobs}} instead.

> Add more options for rocksdb state backend to make configuration more flexible
> --
>
> Key: FLINK-25163
> URL: https://issues.apache.org/jira/browse/FLINK-25163
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: 刘方奇
>Priority: Major
>
> Now flink has less options than the configurations what Rocksdb can set. We 
> can see many function in the org.rocksdb.DBOptions that can influence its 
> behavior(e.g rocksdb background threads).
> It make us do less when we want to do some thing to tuning Rocksdb. In my 
> opinion, there are at least there options:
>  # maxBackgroundFlushes, it can define the background flush threads. default 
> 1.
>  # maxBackgroundCompactions, it can define the background compaction threads. 
> default 1.
>  # maxBackgroundJobs, it can define the background threads. default 2.
> setIncreaseParallelism (the most we can do for the rocksdb backend background 
> threads) seems like can do little. It can't change the flush threads. I think 
> it's necessary to make rocksdb configuration flexible.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25163) Add more options for rocksdb state backend to make configuration more flexible

2021-12-05 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17453766#comment-17453766
 ] 

Yun Tang commented on FLINK-25163:
--

The methods of {{setMaxBackgroundFlushes}} and {{setMaxBackgroundCompactions}} 
have been tagged as deprecated to use {{setMaxBackgroundJobs}} instead (see 
https://github.com/facebook/rocksdb/pull/2908). we could drop the usage of 
{{setIncreaseParallelism}} in Flink to use {{setMaxBackgroundJobs}} instead.

> Add more options for rocksdb state backend to make configuration more flexible
> --
>
> Key: FLINK-25163
> URL: https://issues.apache.org/jira/browse/FLINK-25163
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: 刘方奇
>Priority: Major
>
> Now flink has less options than the configurations what Rocksdb can set. We 
> can see many function in the org.rocksdb.DBOptions that can influence its 
> behavior(e.g rocksdb background threads).
> It make us do less when we want to do some thing to tuning Rocksdb. In my 
> opinion, there are at least there options:
>  # maxBackgroundFlushes, it can define the background flush threads. default 
> 1.
>  # maxBackgroundCompactions, it can define the background compaction threads. 
> default 1.
>  # maxBackgroundJobs, it can define the background threads. default 2.
> setIncreaseParallelism (the most we can do for the rocksdb backend background 
> threads) seems like can do little. It can't change the flush threads. I think 
> it's necessary to make rocksdb configuration flexible.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (FLINK-23791) Enable RocksDB log again

2021-12-02 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-23791.
--
Resolution: Fixed

Merged in master:

754914a98df8791693245d889e195818c3bf1e49

> Enable RocksDB log again
> 
>
> Key: FLINK-23791
> URL: https://issues.apache.org/jira/browse/FLINK-23791
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.15.0
>
>
> FLINK-15068 disabled the RocksDB's local LOG due to previous RocksDB cannot 
> limit the local log files.
> After we upgraded to newer RocksDB version, we can then enable RocksDB log 
> again.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-23791) Enable RocksDB log again

2021-12-02 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-23791:
-
Fix Version/s: (was: 1.14.1)

> Enable RocksDB log again
> 
>
> Key: FLINK-23791
> URL: https://issues.apache.org/jira/browse/FLINK-23791
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.15.0
>
>
> FLINK-15068 disabled the RocksDB's local LOG due to previous RocksDB cannot 
> limit the local log files.
> After we upgraded to newer RocksDB version, we can then enable RocksDB log 
> again.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-24320) Show in the Job / Checkpoints / Configuration if checkpoints are incremental

2021-12-02 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-24320:


Assignee: Hangxiang Yu

> Show in the Job / Checkpoints / Configuration if checkpoints are incremental
> 
>
> Key: FLINK-24320
> URL: https://issues.apache.org/jira/browse/FLINK-24320
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / Web Frontend
>Affects Versions: 1.13.2
>Reporter: Robert Metzger
>Assignee: Hangxiang Yu
>Priority: Minor
>  Labels: auto-deprioritized-major, beginner-friendly
> Attachments: image-2021-09-17-13-31-02-148.png, 
> image-2021-09-24-10-49-53-657.png
>
>
> It would be nice if the Configuration page would also show if incremental 
> checkpoints are enabled.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-24777) Processed (persisted) in-flight data description miss on Monitoring Checkpointing page

2021-12-01 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-24777:


Assignee: camilesing

> Processed (persisted) in-flight data description miss on Monitoring 
> Checkpointing page
> --
>
> Key: FLINK-24777
> URL: https://issues.apache.org/jira/browse/FLINK-24777
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.14.0
>Reporter: camilesing
>Assignee: camilesing
>Priority: Minor
>  Labels: pull-request-available
> Attachments: image-2021-11-05-00-10-08-081.png
>
>
> !image-2021-11-05-00-10-08-081.png!
> Processed (persisted) in-flight data description is missed, we need to merge 
> from Processed (persisted) in-flight data and Persisted) in-flight data 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (FLINK-24046) Refactor the relationship bwtween PredefinedOptions and RocksDBConfigurableOptions

2021-11-30 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-24046.
--
Resolution: Fixed

merged in master: 2d2d92e9812851091d7cee9c9c1764a0f7b4fdc8

> Refactor the relationship bwtween PredefinedOptions and 
> RocksDBConfigurableOptions
> --
>
> Key: FLINK-24046
> URL: https://issues.apache.org/jira/browse/FLINK-24046
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.1
>
>
> RocksDBConfigurableOptions mainly focus on the settings of DBOptions and 
> ColumnFamilyOptions. The original design of this class is used to let user 
> could configure RocksDB via configurations instead of programmatically 
> implemented RocksDBOptionsFactory.
> To make the minimal change, original options in RocksDBConfigurableOptions 
> have no default value so that we would not make anything happen in 
> DefaultConfigurableOptionsFactory just as before.
> However, this make user not so clear of the option meaning with no default 
> value, and we could consider change the relationship between them.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24046) Refactor the relationship bwtween PredefinedOptions and RocksDBConfigurableOptions

2021-11-30 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-24046:
-
Fix Version/s: 1.15.0
   (was: 1.14.1)

> Refactor the relationship bwtween PredefinedOptions and 
> RocksDBConfigurableOptions
> --
>
> Key: FLINK-24046
> URL: https://issues.apache.org/jira/browse/FLINK-24046
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> RocksDBConfigurableOptions mainly focus on the settings of DBOptions and 
> ColumnFamilyOptions. The original design of this class is used to let user 
> could configure RocksDB via configurations instead of programmatically 
> implemented RocksDBOptionsFactory.
> To make the minimal change, original options in RocksDBConfigurableOptions 
> have no default value so that we would not make anything happen in 
> DefaultConfigurableOptionsFactory just as before.
> However, this make user not so clear of the option meaning with no default 
> value, and we could consider change the relationship between them.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25116) Fabric8FlinkKubeClientITCase hangs on Azure

2021-11-30 Thread Yun Tang (Jira)
Yun Tang created FLINK-25116:


 Summary: Fabric8FlinkKubeClientITCase hangs on Azure
 Key: FLINK-25116
 URL: https://issues.apache.org/jira/browse/FLINK-25116
 Project: Flink
  Issue Type: Bug
  Components: Deployment / Kubernetes, Tests
Reporter: Yun Tang


Instance: 
[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=27208=logs=bea52777-eaf8-5663-8482-18fbc3630e81=b2642e3a-5b86-574d-4c8a-f7e2842bfb14]

 
{code:java}
2021-11-29T13:18:56.6420610Z Nov 29 13:18:56 Invoking mvn with 
'/home/vsts/maven_cache/apache-maven-3.2.5/bin/mvn 
-Dmaven.repo.local=/home/vsts/work/1/.m2/repository 
-Dmaven.wagon.http.pool=false -Dorg.slf4j.simpleLogger.showDateTime=true 
-Dorg.slf4j.simpleLogger.dateTimeFormat=HH:mm:ss.SSS 
-Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn
 --no-snapshot-updates -B -Dhadoop.version=2.8.3 -Dinclude_hadoop_aws 
-Dscala-2.12  --settings 
/home/vsts/work/1/s/tools/ci/google-mirror-settings.xml  test 
-Dlog.dir=/home/vsts/work/_temp/debug_files 
-Dlog4j.configurationFile=file:///home/vsts/work/1/s/flink-end-to-end-tests/../tools/ci/log4j.properties
 -Dtest=org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClientITCase'

2021-11-29T13:19:16.0638794Z Nov 29 13:19:16 [INFO] --- 
maven-surefire-plugin:3.0.0-M5:test (default-test) @ flink-kubernetes ---
2021-11-29T17:10:39.7133994Z 
==
2021-11-29T17:10:39.7134714Z === WARNING: This task took already 95% of the 
available time budget of 282 minutes === {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24926) Key group is not in KeyGroupRange when joining two streams with table API

2021-11-29 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17450874#comment-17450874
 ] 

Yun Tang commented on FLINK-24926:
--

Thanks for the explanation [~lincoln.86xy].

You might need to pick the fix or wait for the flink-1.15 release, [~liuhb86].

> Key group is not in KeyGroupRange when joining two streams with table API
> -
>
> Key: FLINK-24926
> URL: https://issues.apache.org/jira/browse/FLINK-24926
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.0
>Reporter: Hongbo
>Priority: Major
>
> I have a simple test to join two streams by the event time:
>  
> {code:java}
> @Test
> void testJoinStream() {
> var settings = EnvironmentSettings
> .newInstance()
> .inStreamingMode()
> .build();
> var tableEnv = TableEnvironment.create(settings);
> var configuration = tableEnv.getConfig().getConfiguration();
> configuration.setString("table.exec.resource.default-parallelism", "2");
> var testTable = tableEnv.from(TableDescriptor.forConnector("datagen")
> .schema(Schema.newBuilder()
> .column("ts", DataTypes.TIMESTAMP(3))
> .column("v", DataTypes.INT())
> .watermark("ts", "ts - INTERVAL '1' second")
> .build())
> .option(DataGenConnectorOptions.ROWS_PER_SECOND, 2L)
> .option("fields.v.kind", "sequence")
> .option("fields.v.start", "0")
> .option("fields.v.end", "100")
> .build());
> testTable.printSchema();
> tableEnv.createTemporaryView("test", testTable );
> var joined = tableEnv.sqlQuery("SELECT ts, v, v2 from test" +
> " join (SELECT ts as ts2, v as v2 from test) on ts = ts2");
> try {
> var tableResult = 
> joined.executeInsert(TableDescriptor.forConnector("print").build());
> tableResult.await();
> } catch (InterruptedException | ExecutionException e) {
> throw new RuntimeException(e);
> }
> } {code}
> It failed within a few seconds:
> {code:java}
> (
>   `ts` TIMESTAMP(3) *ROWTIME*,
>   `v` INT,
>   WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - INTERVAL '1' second
> )
> 1> +I[2021-11-16T17:48:24.415, 1, 1]
> 1> +I[2021-11-16T17:48:24.415, 0, 1]
> 1> +I[2021-11-16T17:48:24.415, 1, 0]
> 1> +I[2021-11-16T17:48:24.415, 0, 0]
> java.util.concurrent.ExecutionException: 
> org.apache.flink.table.api.TableException: Failed to wait job finish
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.flink.table.api.TableException: Failed to wait job finish
>     at com.microstrategy.realtime.FlinkTest.testJoinStream(FlinkTest.java:123)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>     at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
>     at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
>     at 
> 

[jira] [Updated] (FLINK-25094) The verify code in LatencyTrackingMapStateTest#verifyIterator is not actually executed

2021-11-29 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-25094:
-
Fix Version/s: 1.15.0

> The verify code in LatencyTrackingMapStateTest#verifyIterator is not actually 
> executed
> --
>
> Key: FLINK-25094
> URL: https://issues.apache.org/jira/browse/FLINK-25094
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends, Tests
>Reporter: Jinzhong Li
>Assignee: Jinzhong Li
>Priority: Minor
> Fix For: 1.15.0
>
>
> In LatencyTrackingMapStateTest, 
> iterator()/entries().iterator()/keys().iterator()/values().iterator() will be 
> invoke before verifyIterator method is invoked, this is, 
> iterator()/... will be invode before putting the test data into 
> latencyTrackingMapState. So the verify code is not actually executed since 
> "iterator.hasNext()" is always false.
> {code:java}
> private  void verifyIterator(
> LatencyTrackingMapState 
> latencyTrackingState,
> LatencyTrackingMapState.MapStateLatencyMetrics 
> latencyTrackingStateMetric,
> Iterator iterator,
> boolean removeIterator)
> throws Exception {
> ThreadLocalRandom random = ThreadLocalRandom.current();
> for (int index = 1; index <= SAMPLE_INTERVAL; index++) {
> latencyTrackingState.put((long) index, random.nextDouble());
> }
> int count = 1;
> while (iterator.hasNext()) {
> int expectedResult = count == SAMPLE_INTERVAL ? 0 : count;
> assertEquals(expectedResult, 
> latencyTrackingStateMetric.getIteratorHasNextCount());
> iterator.next();
> assertEquals(expectedResult, 
> latencyTrackingStateMetric.getIteratorNextCount());
> if (removeIterator) {
> iterator.remove();
> assertEquals(expectedResult, 
> latencyTrackingStateMetric.getIteratorRemoveCount());
> }
> count += 1;
> }
> // as we call #hasNext on more time than #next, to avoid complex check, 
> just reset hasNext
> // counter in the end.
> latencyTrackingStateMetric.resetIteratorHasNextCount();
> latencyTrackingState.clear();
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-25094) The verify code in LatencyTrackingMapStateTest#verifyIterator is not actually executed

2021-11-29 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-25094:


Assignee: Jinzhong Li

> The verify code in LatencyTrackingMapStateTest#verifyIterator is not actually 
> executed
> --
>
> Key: FLINK-25094
> URL: https://issues.apache.org/jira/browse/FLINK-25094
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends, Tests
>Reporter: Jinzhong Li
>Assignee: Jinzhong Li
>Priority: Minor
>
> In LatencyTrackingMapStateTest, 
> iterator()/entries().iterator()/keys().iterator()/values().iterator() will be 
> invoke before verifyIterator method is invoked, this is, 
> iterator()/... will be invode before putting the test data into 
> latencyTrackingMapState. So the verify code is not actually executed since 
> "iterator.hasNext()" is always false.
> {code:java}
> private  void verifyIterator(
> LatencyTrackingMapState 
> latencyTrackingState,
> LatencyTrackingMapState.MapStateLatencyMetrics 
> latencyTrackingStateMetric,
> Iterator iterator,
> boolean removeIterator)
> throws Exception {
> ThreadLocalRandom random = ThreadLocalRandom.current();
> for (int index = 1; index <= SAMPLE_INTERVAL; index++) {
> latencyTrackingState.put((long) index, random.nextDouble());
> }
> int count = 1;
> while (iterator.hasNext()) {
> int expectedResult = count == SAMPLE_INTERVAL ? 0 : count;
> assertEquals(expectedResult, 
> latencyTrackingStateMetric.getIteratorHasNextCount());
> iterator.next();
> assertEquals(expectedResult, 
> latencyTrackingStateMetric.getIteratorNextCount());
> if (removeIterator) {
> iterator.remove();
> assertEquals(expectedResult, 
> latencyTrackingStateMetric.getIteratorRemoveCount());
> }
> count += 1;
> }
> // as we call #hasNext on more time than #next, to avoid complex check, 
> just reset hasNext
> // counter in the end.
> latencyTrackingStateMetric.resetIteratorHasNextCount();
> latencyTrackingState.clear();
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-21823) RocksDB on FreeBSD

2021-11-28 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-21823.

Resolution: Information Provided

Closing this issue due to inactivity.

> RocksDB on FreeBSD
> --
>
> Key: FLINK-21823
> URL: https://issues.apache.org/jira/browse/FLINK-21823
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.11.3
>Reporter: Mårten Lindblad
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-minor
>
> Can't use the RocksDB backend in FreeBSD 12.2:
>  
> {color:#00}java.lang.{color}{color:#ff}Exception{color}{color:#00}:
>  {color}{color:#ff}Exception{color} 
> {color:#ff}while{color}{color:#00} creating 
> {color}{color:#008080}StreamOperatorStateContext{color}{color:#00}.{color}
> {color:#00}at 
> org.apache.flink.streaming.api.operators.{color}{color:#008080}StreamTaskStateInitializerImpl{color}{color:#00}.streamOperatorStateContext({color}{color:#008080}StreamTaskStateInitializerImpl{color}{color:#00}.java:{color}{color:#09885a}222{color}{color:#00}){color}
> {color:#00}at 
> org.apache.flink.streaming.api.operators.{color}{color:#008080}AbstractStreamOperator{color}{color:#00}.initializeState({color}{color:#008080}AbstractStreamOperator{color}{color:#00}.java:{color}{color:#09885a}248{color}{color:#00}){color}
> {color:#00}at 
> org.apache.flink.streaming.runtime.tasks.{color}{color:#008080}OperatorChain{color}{color:#00}.initializeStateAndOpenOperators({color}{color:#008080}OperatorChain{color}{color:#00}.java:{color}{color:#09885a}290{color}{color:#00}){color}
> {color:#00}at 
> org.apache.flink.streaming.runtime.tasks.{color}{color:#008080}StreamTask{color}{color:#00}.lambda$beforeInvoke$1({color}{color:#008080}StreamTask{color}{color:#00}.java:{color}{color:#09885a}506{color}{color:#00}){color}
> {color:#00}at 
> org.apache.flink.streaming.runtime.tasks.{color}{color:#008080}StreamTaskActionExecutor$1{color}{color:#00}.runThrowing({color}{color:#008080}StreamTaskActionExecutor{color}{color:#00}.java:{color}{color:#09885a}47{color}{color:#00}){color}
> {color:#00}at 
> org.apache.flink.streaming.runtime.tasks.{color}{color:#008080}StreamTask{color}{color:#00}.beforeInvoke({color}{color:#008080}StreamTask{color}{color:#00}.java:{color}{color:#09885a}475{color}{color:#00}){color}
> {color:#00}at 
> org.apache.flink.streaming.runtime.tasks.{color}{color:#008080}StreamTask{color}{color:#00}.invoke({color}{color:#008080}StreamTask{color}{color:#00}.java:{color}{color:#09885a}526{color}{color:#00}){color}
> {color:#00}at 
> org.apache.flink.runtime.taskmanager.{color}{color:#008080}Task{color}{color:#00}.doRun({color}{color:#008080}Task{color}{color:#00}.java:{color}{color:#09885a}721{color}{color:#00}){color}
> {color:#00}at 
> org.apache.flink.runtime.taskmanager.{color}{color:#008080}Task{color}{color:#00}.run({color}{color:#008080}Task{color}{color:#00}.java:{color}{color:#09885a}546{color}{color:#00}){color}
> {color:#00}at 
> java.lang.{color}{color:#008080}Thread{color}{color:#00}.run({color}{color:#008080}Thread{color}{color:#00}.java:{color}{color:#09885a}748{color}{color:#00}){color}
> {color:#008080}Caused{color} {color:#ff}by{color}{color:#00}: 
> org.apache.flink.util.{color}{color:#008080}FlinkException{color}{color:#00}:
>  {color}{color:#008080}Could{color} {color:#ff}not{color}{color:#00} 
> restore keyed state backend {color}{color:#ff}for{color} 
> {color:#008080}KeyedProcessOperator_bb679bef01a6dead9d79d979cc95234a_{color}{color:#00}({color}{color:#09885a}1{color}{color:#00}/{color}{color:#09885a}1{color}{color:#00})
>  {color}{color:#ff}from{color} {color:#ff}any{color} 
> {color:#ff}of{color}{color:#00} the 
> {color}{color:#09885a}1{color}{color:#00} provided restore options.{color}
> {color:#00}at 
> org.apache.flink.streaming.api.operators.{color}{color:#008080}BackendRestorerProcedure{color}{color:#00}.createAndRestore({color}{color:#008080}BackendRestorerProcedure{color}{color:#00}.java:{color}{color:#09885a}135{color}{color:#00}){color}
> {color:#00}at 
> org.apache.flink.streaming.api.operators.{color}{color:#008080}StreamTaskStateInitializerImpl{color}{color:#00}.keyedStatedBackend({color}{color:#008080}StreamTaskStateInitializerImpl{color}{color:#00}.java:{color}{color:#09885a}335{color}{color:#00}){color}
> {color:#00}at 
> 

[jira] [Updated] (FLINK-23798) Avoid using reflection to get filter when partition filter is enabled

2021-11-28 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-23798:
-
Release Note:   (was: Merged
master: 7d7a97477a50753e38e5b499effc62bb00dfcfe2
release-1.14: f7c0381eb202819b9c1ecc1e3693b31377fe2a9a)

> Avoid using reflection to get filter when partition filter is enabled
> -
>
> Key: FLINK-23798
> URL: https://issues.apache.org/jira/browse/FLINK-23798
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Assignee: PengFei Li
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.1
>
>
> FLINK-20496 introduce partitioned index & filter to Flink. However, RocksDB 
> only support new full format of filter in this feature, and we need to 
> replace previous filter if user enabled. [Previous implementation use 
> reflection to get the 
> filter|https://github.com/apache/flink/blob/7ff4cbdc25aa971dccaf5ce02aaf46dc1e7345cc/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java#L251-L258]
>  and we could use API to get that after upgrading to newer version.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-23798) Avoid using reflection to get filter when partition filter is enabled

2021-11-28 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17450169#comment-17450169
 ] 

Yun Tang commented on FLINK-23798:
--

Merged
master: 7d7a97477a50753e38e5b499effc62bb00dfcfe2
release-1.14: f7c0381eb202819b9c1ecc1e3693b31377fe2a9a

> Avoid using reflection to get filter when partition filter is enabled
> -
>
> Key: FLINK-23798
> URL: https://issues.apache.org/jira/browse/FLINK-23798
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Assignee: PengFei Li
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.1
>
>
> FLINK-20496 introduce partitioned index & filter to Flink. However, RocksDB 
> only support new full format of filter in this feature, and we need to 
> replace previous filter if user enabled. [Previous implementation use 
> reflection to get the 
> filter|https://github.com/apache/flink/blob/7ff4cbdc25aa971dccaf5ce02aaf46dc1e7345cc/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java#L251-L258]
>  and we could use API to get that after upgrading to newer version.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (FLINK-23798) Avoid using reflection to get filter when partition filter is enabled

2021-11-28 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-23798.
--
Release Note: 
Merged
master: 7d7a97477a50753e38e5b499effc62bb00dfcfe2
release-1.14: f7c0381eb202819b9c1ecc1e3693b31377fe2a9a
  Resolution: Fixed

> Avoid using reflection to get filter when partition filter is enabled
> -
>
> Key: FLINK-23798
> URL: https://issues.apache.org/jira/browse/FLINK-23798
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Assignee: PengFei Li
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.1
>
>
> FLINK-20496 introduce partitioned index & filter to Flink. However, RocksDB 
> only support new full format of filter in this feature, and we need to 
> replace previous filter if user enabled. [Previous implementation use 
> reflection to get the 
> filter|https://github.com/apache/flink/blob/7ff4cbdc25aa971dccaf5ce02aaf46dc1e7345cc/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java#L251-L258]
>  and we could use API to get that after upgrading to newer version.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-23798) Avoid using reflection to get filter when partition filter is enabled

2021-11-28 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-23798:
-
Fix Version/s: 1.14.1

> Avoid using reflection to get filter when partition filter is enabled
> -
>
> Key: FLINK-23798
> URL: https://issues.apache.org/jira/browse/FLINK-23798
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Assignee: PengFei Li
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.1
>
>
> FLINK-20496 introduce partitioned index & filter to Flink. However, RocksDB 
> only support new full format of filter in this feature, and we need to 
> replace previous filter if user enabled. [Previous implementation use 
> reflection to get the 
> filter|https://github.com/apache/flink/blob/7ff4cbdc25aa971dccaf5ce02aaf46dc1e7345cc/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java#L251-L258]
>  and we could use API to get that after upgrading to newer version.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (FLINK-25067) Correct the description of RocksDB's background threads

2021-11-26 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-25067.
--
Resolution: Fixed

Merged in

master: 832d412e8c34a28849b346ac1088c704622a08e1

release-1.14: 315f3bc8a44cfa2edcfbeacb1f52089406a5471b

release-1.13: 52c3931c9fda2ac8483fae312e8d265b2226a54c

> Correct the description of RocksDB's background threads
> ---
>
> Key: FLINK-25067
> URL: https://issues.apache.org/jira/browse/FLINK-25067
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.1, 1.13.4
>
>
> RocksDB actually has changed the maximum number of concurrent background 
> flush and compaction jobs to 2 for long time, we should fix the related 
> documentation.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-21027) Add isStateKeyValueSerialized() method to KeyedStateBackend interface

2021-11-25 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-21027:
-
Fix Version/s: 1.15.0

> Add isStateKeyValueSerialized() method to KeyedStateBackend interface
> -
>
> Key: FLINK-21027
> URL: https://issues.apache.org/jira/browse/FLINK-21027
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Jark Wu
>Assignee: Yun Tang
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned
> Fix For: 1.15.0
>
>
> In Table/SQL operators, we have some optimizations that reuse objects of keys 
> and records. For example, we buffer input records in {{BytesMultiMap}} and 
> use the reused object to map to the underlying memory segment to reduce bytes 
> copy. 
> However, if we put the reused key and value into Heap statebackend, the 
> result will be wrong, because it is not allowed to mutate keys and values in 
> Heap statebackend. 
> Therefore, it would be great if {{KeyedStateBackend}} can expose such API, so 
> that Table/SQL can dynamically decide whether to copy the keys and values 
> before putting into state. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-21027) Add isStateKeyValueSerialized() method to KeyedStateBackend interface

2021-11-25 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-21027:


Assignee: Yun Tang

> Add isStateKeyValueSerialized() method to KeyedStateBackend interface
> -
>
> Key: FLINK-21027
> URL: https://issues.apache.org/jira/browse/FLINK-21027
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Jark Wu
>Assignee: Yun Tang
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned
>
> In Table/SQL operators, we have some optimizations that reuse objects of keys 
> and records. For example, we buffer input records in {{BytesMultiMap}} and 
> use the reused object to map to the underlying memory segment to reduce bytes 
> copy. 
> However, if we put the reused key and value into Heap statebackend, the 
> result will be wrong, because it is not allowed to mutate keys and values in 
> Heap statebackend. 
> Therefore, it would be great if {{KeyedStateBackend}} can expose such API, so 
> that Table/SQL can dynamically decide whether to copy the keys and values 
> before putting into state. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25067) Correct the description of RocksDB's background threads

2021-11-25 Thread Yun Tang (Jira)
Yun Tang created FLINK-25067:


 Summary: Correct the description of RocksDB's background threads
 Key: FLINK-25067
 URL: https://issues.apache.org/jira/browse/FLINK-25067
 Project: Flink
  Issue Type: Bug
  Components: Documentation, Runtime / State Backends
Reporter: Yun Tang
Assignee: Yun Tang
 Fix For: 1.15.0, 1.14.1, 1.13.4


RocksDB actually has changed the maximum number of concurrent background flush 
and compaction jobs to 2 for long time, we should fix the related documentation.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24926) Key group is not in KeyGroupRange when joining two streams with table API

2021-11-25 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17449172#comment-17449172
 ] 

Yun Tang commented on FLINK-24926:
--

I am not sure whether Flink SQL supports to join on the stream itself.

The root cause is that the key selector on the join operator is actually 
EmptyRowDataKeySelector (see 
[here|https://github.com/apache/flink/blob/2c5ccd1e909d38ef18486ba55da6db454e33ca94/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/KeySelectorUtil.java#L67]
 ), which leads to no matter what kind of key is passed, the downstream 
operator cannot set the correct key.

> Key group is not in KeyGroupRange when joining two streams with table API
> -
>
> Key: FLINK-24926
> URL: https://issues.apache.org/jira/browse/FLINK-24926
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.0
>Reporter: Hongbo
>Priority: Major
>
> I have a simple test to join two streams by the event time:
>  
> {code:java}
> @Test
> void testJoinStream() {
> var settings = EnvironmentSettings
> .newInstance()
> .inStreamingMode()
> .build();
> var tableEnv = TableEnvironment.create(settings);
> var configuration = tableEnv.getConfig().getConfiguration();
> configuration.setString("table.exec.resource.default-parallelism", "2");
> var testTable = tableEnv.from(TableDescriptor.forConnector("datagen")
> .schema(Schema.newBuilder()
> .column("ts", DataTypes.TIMESTAMP(3))
> .column("v", DataTypes.INT())
> .watermark("ts", "ts - INTERVAL '1' second")
> .build())
> .option(DataGenConnectorOptions.ROWS_PER_SECOND, 2L)
> .option("fields.v.kind", "sequence")
> .option("fields.v.start", "0")
> .option("fields.v.end", "100")
> .build());
> testTable.printSchema();
> tableEnv.createTemporaryView("test", testTable );
> var joined = tableEnv.sqlQuery("SELECT ts, v, v2 from test" +
> " join (SELECT ts as ts2, v as v2 from test) on ts = ts2");
> try {
> var tableResult = 
> joined.executeInsert(TableDescriptor.forConnector("print").build());
> tableResult.await();
> } catch (InterruptedException | ExecutionException e) {
> throw new RuntimeException(e);
> }
> } {code}
> It failed within a few seconds:
> {code:java}
> (
>   `ts` TIMESTAMP(3) *ROWTIME*,
>   `v` INT,
>   WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - INTERVAL '1' second
> )
> 1> +I[2021-11-16T17:48:24.415, 1, 1]
> 1> +I[2021-11-16T17:48:24.415, 0, 1]
> 1> +I[2021-11-16T17:48:24.415, 1, 0]
> 1> +I[2021-11-16T17:48:24.415, 0, 0]
> java.util.concurrent.ExecutionException: 
> org.apache.flink.table.api.TableException: Failed to wait job finish
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.flink.table.api.TableException: Failed to wait job finish
>     at com.microstrategy.realtime.FlinkTest.testJoinStream(FlinkTest.java:123)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>     at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
>     at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>     at 
> 

[jira] [Commented] (FLINK-21321) Change RocksDB incremental checkpoint re-scaling to use deleteRange

2021-11-25 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17449065#comment-17449065
 ] 

Yun Tang commented on FLINK-21321:
--

[~legojoey17] Would you please update the PR?

> Change RocksDB incremental checkpoint re-scaling to use deleteRange
> ---
>
> Key: FLINK-21321
> URL: https://issues.apache.org/jira/browse/FLINK-21321
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Joey Pereira
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> In FLINK-8790, it was suggested to use RocksDB's {{deleteRange}} API to more 
> efficiently clip the databases for the desired target group.
> During the PR for that ticket, 
> [#5582|https://github.com/apache/flink/pull/5582], the change did not end up 
> using the {{deleteRange}} method  as it was an experimental feature in 
> RocksDB.
> At this point {{deleteRange}} is in a far less experimental state now but I 
> believe is still formally "experimental". It is heavily by many others like 
> CockroachDB and TiKV and they have teased out several bugs in complex 
> interactions over the years.
> For certain re-scaling situations where restores trigger 
> {{restoreWithScaling}} and the DB clipping logic, this would likely reduce an 
> O[n] operation (N = state size/records) to O(1). For large state apps, this 
> would potentially represent a non-trivial amount of time spent for 
> re-scaling. In the case of my workplace, we have an operator with 100s of 
> billions of records in state and re-scaling was taking a long time (>>30min, 
> but it has been awhile since doing it).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24611) Prevent JM from discarding state on checkpoint abortion

2021-11-24 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448454#comment-17448454
 ] 

Yun Tang commented on FLINK-24611:
--

Sorry for missing the notification as I was in the two days' on-call.

[~sewen] {{Incremental state are currently only SST files, how often are those 
below 20KBytes?}}
In the general case, RocksDB should not have many 20KB size files. This could 
happen if checkpoint interval is too small, thus make the flushing too 
frequently. On the other hand, it might happen under case that too many states 
with extremely small memory capacity, write buffer manager would flush empty 
memtable in advance (see FLINK-19238).

Dropping placeholder handle would make the behavior of checkpointed state size 
change. Current checkpointed state size shown in the web UI is incremental 
checkpoint size, if we drop the placeholder handle, we cannot know the actual 
incremental checkpoint size.

If dropping placeholder handle in changelog state-backend would make things 
more easier, can we only disable the placeholder handle with changelog 
state-backend, but still keep it for the normal usage of RocksDB state-backend? 
Changelog state-backend would not checkpoint too often. On the other hand, 
flushing memtables too often in the low-memory case could show explicit poor 
performance and user should detect that then.

> Prevent JM from discarding state on checkpoint abortion
> ---
>
> Key: FLINK-24611
> URL: https://issues.apache.org/jira/browse/FLINK-24611
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0
>Reporter: Roman Khachatryan
>Assignee: Roman Khachatryan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> 
> MOTIVATION
> When a checkpoint is aborted, JM discards any state that was sent to it and 
> wasn't used in other checkpoints. This forces incremental state backends to 
> wait for confirmation from JM before re-using this state. For changelog 
> backend this is even more critical.
> One approach proposed was to make backends/TMs responsible for their state, 
> until it's not shared with other TMs, i.e. until rescaling (private/shared 
> state ownership track: FLINK-23342 and more).
> However, that approach is quite invasive.
> An alternative solution would be:
> 1. SharedStateRegistry remembers the latest checkpoint for each shared state 
> (instead of usage count currently)
> 2. CompletedCheckpointStore notifies it about the lowest valid checkpoint (on 
> subsumption)
> 3. SharedStateRegistry then discards any state associated with the lower 
> (subsumed/aborted) checkpoints
> So the aborted checkpoint can only be discarded after some subsequent 
> successful checkpoint (which can mark state as used).
> Mostly JM code is changed. IncrementalRemoteKeyedStateHandle.discard needs to 
> be adjusted.
> Backends don't re-upload state.
> 
> IMPLEMENTATION CONSIDERATIONS
> On subsumption, JM needs to find all the unused state and discard it.
> This can either be done by
> 1) simply traversing all entries; or by 
> 2) maintaining a set of entries per checkpoint (e.g. SortedMap Set>). This allows to skip unnecessary traversal at the cost of higher 
> memory usage
> In both cases:
> - each entry stores last checkpoint ID it was used in (long)
> - key is hashed (even with plain traversal, map.entrySet.iterator.remove() 
> computes hash internally)
> Because of the need to maintain secondary sets, (2) isn't asymptotically 
> better than (1), and is likely worse in practice and requires more memory 
> (see discussion in the comments). So approach (1) seems reasonable.
> 
> CORNER CASES
> The following cases shouldn't pose any difficulties:
> 1. Recovery, re-scaling, and state used by not all or by no tasks - we still 
> register all states on recovery even after FLINK-22483/FLINK-24086
> 2. Cross-task state sharing - not an issue as long as everything is managed 
> by JM
> 3. Dependencies between SharedStateRegistry and CompletedCheckpointStore - 
> simple after FLINK-24086
> 4. Multiple concurrent checkpoints (below)
> Consider the following case:
> (nr. concurrent checkpoints > 1)
> 1. checkpoint 1 starts, TM reports that it uses file f1; checkpoint 1 gets 
> aborted - f1 is now tracked
> 2. savepoint 2 starts, it *will* use f1
> 3. checkpoint 3 starts and finishes; it does NOT use file f1
> When a checkpoint finishes, all pending checkpoints before it are aborted - 
> but not savepoints.
> Savepoints currently are 

[jira] [Commented] (FLINK-15571) Create a Redis Streams Connector for Flink

2021-11-23 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448413#comment-17448413
 ] 

Yun Tang commented on FLINK-15571:
--

[~monster#12] Already assigned to you.

> Create a Redis Streams Connector for Flink
> --
>
> Key: FLINK-15571
> URL: https://issues.apache.org/jira/browse/FLINK-15571
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Tugdual Grall
>Assignee: ZhuoYu Chen
>Priority: Minor
>  Labels: pull-request-available
>
> Redis has a "log data structure" called Redis Streams, it would be nice to 
> integrate Redis Streams and Apache Flink as:
>  * Source
>  * Sink
> See Redis Streams introduction: [https://redis.io/topics/streams-intro]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-15571) Create a Redis Streams Connector for Flink

2021-11-23 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-15571:


Assignee: ZhuoYu Chen

> Create a Redis Streams Connector for Flink
> --
>
> Key: FLINK-15571
> URL: https://issues.apache.org/jira/browse/FLINK-15571
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Tugdual Grall
>Assignee: ZhuoYu Chen
>Priority: Minor
>  Labels: pull-request-available
>
> Redis has a "log data structure" called Redis Streams, it would be nice to 
> integrate Redis Streams and Apache Flink as:
>  * Source
>  * Sink
> See Redis Streams introduction: [https://redis.io/topics/streams-intro]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-11868) [filesystems] Introduce listStatusIterator API to file system

2021-11-23 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-11868:


Assignee: Yun Tang

> [filesystems] Introduce listStatusIterator API to file system
> -
>
> Key: FLINK-11868
> URL: https://issues.apache.org/jira/browse/FLINK-11868
> Project: Flink
>  Issue Type: New Feature
>  Components: FileSystems
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned
> Fix For: 1.15.0
>
>
> From existed experience, we know {{listStatus}} is expensive for many 
> distributed file systems especially when the folder contains too many files. 
> This method would not only block the thread until result is return but also 
> could cause OOM due to the returned array of {{FileStatus}} is really large. 
> I think we should already learn it from FLINK-7266 and FLINK-8540.
> However, list file status under a path is really helpful in many situations. 
> Thankfully, many distributed file system noticed that and provide API such as 
> {{[listStatusIterator|https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#listStatusIterator(org.apache.hadoop.fs.Path)]}}
>  to call the file system on demand.
>  
> We should also introduce this API and replace current implementation which 
> used previous {{listStatus}}.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-15571) Create a Redis Streams Connector for Flink

2021-11-23 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448382#comment-17448382
 ] 

Yun Tang edited comment on FLINK-15571 at 11/24/21, 6:21 AM:
-

[~monster#12] Currently, Flink community would encourage contributors to add 
new connector to [flink packages|https://flink-packages.org/] and then could 
contribute to official [flink-connector 
repo|https://github.com/apache/flink-connectors] (it has not been setup 
completely).


was (Author: yunta):
[~monster#12] Currently, Flink community would encourage contributors to add 
new connector to [flink packages|https://flink-packages.org/] and then could 
contribute to official [flink-connector 
repo|https://github.com/apache/flink-connectors] (it has not been setup 
completely).

> Create a Redis Streams Connector for Flink
> --
>
> Key: FLINK-15571
> URL: https://issues.apache.org/jira/browse/FLINK-15571
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Tugdual Grall
>Priority: Minor
>  Labels: pull-request-available
>
> Redis has a "log data structure" called Redis Streams, it would be nice to 
> integrate Redis Streams and Apache Flink as:
>  * Source
>  * Sink
> See Redis Streams introduction: [https://redis.io/topics/streams-intro]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-15571) Create a Redis Streams Connector for Flink

2021-11-23 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448382#comment-17448382
 ] 

Yun Tang commented on FLINK-15571:
--

[~monster#12] Currently, Flink community would encourage contributors to add 
new connector to [flink packages|https://flink-packages.org/] and then could 
contribute to official [flink-connector 
repo|https://github.com/apache/flink-connectors] (it has not been setup 
completely).

> Create a Redis Streams Connector for Flink
> --
>
> Key: FLINK-15571
> URL: https://issues.apache.org/jira/browse/FLINK-15571
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Tugdual Grall
>Priority: Minor
>  Labels: pull-request-available
>
> Redis has a "log data structure" called Redis Streams, it would be nice to 
> integrate Redis Streams and Apache Flink as:
>  * Source
>  * Sink
> See Redis Streams introduction: [https://redis.io/topics/streams-intro]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24815) Reduce the cpu cost of calculating stateSize during state allocation

2021-11-23 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448334#comment-17448334
 ] 

Yun Tang commented on FLINK-24815:
--

Apart from calucating the state size lazily, I like the idea of store state 
size in metadata as we could also know the full state size and represent it in 
the UI (current state size calucated on JM side is actually the incremental 
state size).

> Reduce the cpu cost of calculating stateSize during state allocation
> 
>
> Key: FLINK-24815
> URL: https://issues.apache.org/jira/browse/FLINK-24815
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.14.0
>Reporter: ming li
>Priority: Major
>
> When the task failover, we will reassign the state for each subtask and 
> create a new {{OperatorSubtaskState}} object. At this time, the {{stateSize}} 
> field in the {{OperatorSubtaskState}} will be recalculated. When using 
> incremental {{{}Checkpoint{}}}, this field needs to traverse all shared 
> states and then accumulate the size of the state.
> Taking a job with 2000 parallelism and 100 share state for each task as an 
> example, it needs to traverse 2000 * 100 = 20w times. At this time, the cpu 
> of the JM scheduling thread will be full.
> I think we can try to provide a construction method with {{stateSize}} for 
> {{OperatorSubtaskState}} or delay the calculation of {{{}stateSize{}}}.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24785) Relocate RocksDB's log under flink log directory by default

2021-11-22 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17447345#comment-17447345
 ] 

Yun Tang commented on FLINK-24785:
--

I think you have forgotten the configuration of 
[state.backend.rocksdb.log.dir|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-backend-rocksdb-log-dir].

[~nicholasjiang], thanks for your enthusiasm to take this ticket and already 
assign to you. And please follow the 
[code-contribution-process|https://flink.apache.org/contributing/contribute-code.html#code-contribution-process]
 to get the assignment first for the next time.

> Relocate RocksDB's log under flink log directory by default
> ---
>
> Key: FLINK-24785
> URL: https://issues.apache.org/jira/browse/FLINK-24785
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Nicholas Jiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Previously, RocksDB's log locates at its own DB folder, which makes the 
> debuging RocksDB not so easy. We could let RocksDB's log stay in Flink's log 
> directory by default.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-24785) Relocate RocksDB's log under flink log directory by default

2021-11-22 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-24785:


Assignee: Nicholas Jiang

> Relocate RocksDB's log under flink log directory by default
> ---
>
> Key: FLINK-24785
> URL: https://issues.apache.org/jira/browse/FLINK-24785
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Nicholas Jiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Previously, RocksDB's log locates at its own DB folder, which makes the 
> debuging RocksDB not so easy. We could let RocksDB's log stay in Flink's log 
> directory by default.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24918) Support to specify the data dir for state benchmark

2021-11-21 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17447188#comment-17447188
 ] 

Yun Tang commented on FLINK-24918:
--

Merged in flink-master: 726eead617bb3392df6a6ef1681de62c631bbeb5

> Support to specify the data dir for state benchmark 
> 
>
> Key: FLINK-24918
> URL: https://issues.apache.org/jira/browse/FLINK-24918
> Project: Flink
>  Issue Type: Improvement
>  Components: Benchmarks, Runtime / State Backends
>Reporter: Aitozi
>Assignee: Aitozi
>Priority: Minor
>  Labels: pull-request-available
>
> {{StateBackendBenchmarkUtils}} use null as the parent dir to create temp dir, 
> which will finally use the /tmp as the data dir. It has two downsides:
> 1. the /tmp dir often mount with tmpfs, which may store data in memory. it 
> will affect the result of rocksdb benchmark
> 2. It can not support to use benchmark to measure the performance on a new 
> device. 
> So I purpose to enhance the state benchmark to support specify the default 
> data dir. And avoiding to use the {{/tmp}} as default.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24926) Key group is not in KeyGroupRange when joining two streams with table API

2021-11-21 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17447187#comment-17447187
 ] 

Yun Tang commented on FLINK-24926:
--

[~liuhb86] Can you repeat this error every time?
I use test code below to verify but not ever come across your error message for 
minutes.

{code:java}
  @Test
  def testJoinStream(): Unit = {
val settings = EnvironmentSettings
  .newInstance()
  .inStreamingMode()
  .build();
val tableEnv = TableEnvironment.create(settings)
val configuration = tableEnv.getConfig.getConfiguration
configuration.setString("table.exec.resource.default-parallelism", "2");

val testTable = tableEnv.from(TableDescriptor.forConnector("datagen")
  .schema(Schema.newBuilder()
.column("ts", DataTypes.TIMESTAMP(3))
.column("v", DataTypes.INT())
.watermark("ts", "ts - INTERVAL '1' second")
.build())
  .option("rows-per-second", "2")
  .option("fields.v.kind", "sequence")
  .option("fields.v.start", "0")
  .option("fields.v.end", "100")
  .build())
testTable.printSchema();
tableEnv.createTemporaryView("test", testTable );

val joined = tableEnv.sqlQuery("SELECT ts, v, v2 from test" +
  " join (SELECT ts as ts2, v as v2 from test) on ts = ts2");

  val tableResult = 
joined.executeInsert(TableDescriptor.forConnector("print").build());
  tableResult.await()
  }
{code}
 

> Key group is not in KeyGroupRange when joining two streams with table API
> -
>
> Key: FLINK-24926
> URL: https://issues.apache.org/jira/browse/FLINK-24926
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.0
>Reporter: Hongbo
>Priority: Major
>
> I have a simple test to join two streams by the event time:
>  
> {code:java}
> @Test
> void testJoinStream() {
> var settings = EnvironmentSettings
> .newInstance()
> .inStreamingMode()
> .build();
> var tableEnv = TableEnvironment.create(settings);
> var configuration = tableEnv.getConfig().getConfiguration();
> configuration.setString("table.exec.resource.default-parallelism", "2");
> var testTable = tableEnv.from(TableDescriptor.forConnector("datagen")
> .schema(Schema.newBuilder()
> .column("ts", DataTypes.TIMESTAMP(3))
> .column("v", DataTypes.INT())
> .watermark("ts", "ts - INTERVAL '1' second")
> .build())
> .option(DataGenConnectorOptions.ROWS_PER_SECOND, 2L)
> .option("fields.v.kind", "sequence")
> .option("fields.v.start", "0")
> .option("fields.v.end", "100")
> .build());
> testTable.printSchema();
> tableEnv.createTemporaryView("test", testTable );
> var joined = tableEnv.sqlQuery("SELECT ts, v, v2 from test" +
> " join (SELECT ts as ts2, v as v2 from test) on ts = ts2");
> try {
> var tableResult = 
> joined.executeInsert(TableDescriptor.forConnector("print").build());
> tableResult.await();
> } catch (InterruptedException | ExecutionException e) {
> throw new RuntimeException(e);
> }
> } {code}
> It failed within a few seconds:
> {code:java}
> (
>   `ts` TIMESTAMP(3) *ROWTIME*,
>   `v` INT,
>   WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - INTERVAL '1' second
> )
> 1> +I[2021-11-16T17:48:24.415, 1, 1]
> 1> +I[2021-11-16T17:48:24.415, 0, 1]
> 1> +I[2021-11-16T17:48:24.415, 1, 0]
> 1> +I[2021-11-16T17:48:24.415, 0, 0]
> java.util.concurrent.ExecutionException: 
> org.apache.flink.table.api.TableException: Failed to wait job finish
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.flink.table.api.TableException: Failed to wait job finish
>     at com.microstrategy.realtime.FlinkTest.testJoinStream(FlinkTest.java:123)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>     at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
>     at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
>     at 
> 

[jira] [Commented] (FLINK-24967) Make the IO pattern configureable in state benchmarks

2021-11-21 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17447186#comment-17447186
 ] 

Yun Tang commented on FLINK-24967:
--

I think introducing various size of state related operations should be helpful. 
Current configuration of IO size just balance the end-to-end benchmark running 
duration and the abililty to verify different state-backends performance.

> Make the IO pattern configureable in state benchmarks
> -
>
> Key: FLINK-24967
> URL: https://issues.apache.org/jira/browse/FLINK-24967
> Project: Flink
>  Issue Type: Improvement
>  Components: Benchmarks, Runtime / State Backends
>Reporter: Aitozi
>Priority: Minor
>
> Currently, state benchmarks IO size are controlled by 
> {{StateBenchmarkConstants}}, which are not flexible to change. It's not easy 
> to test the performance under different IO size/pattern and different disk 
> (which can be solved by 
> [FLINK-24918|https://issues.apache.org/jira/browse/FLINK-24918]). I purpose 
> to make the state benchmark more configurable .



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24918) Support to specify the data dir for state benchmark

2021-11-18 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17445744#comment-17445744
 ] 

Yun Tang commented on FLINK-24918:
--

[~aitozi] Already assigned to you, please go ahead.

> Support to specify the data dir for state benchmark 
> 
>
> Key: FLINK-24918
> URL: https://issues.apache.org/jira/browse/FLINK-24918
> Project: Flink
>  Issue Type: Improvement
>  Components: Benchmarks, Runtime / State Backends
>Reporter: Aitozi
>Assignee: Aitozi
>Priority: Minor
>
> {{StateBackendBenchmarkUtils}} use null as the parent dir to create temp dir, 
> which will finally use the /tmp as the data dir. It has two downsides:
> 1. the /tmp dir often mount with tmpfs, which may store data in memory. it 
> will affect the result of rocksdb benchmark
> 2. It can not support to use benchmark to measure the performance on a new 
> device. 
> So I purpose to enhance the state benchmark to support specify the default 
> data dir. And avoiding to use the {{/tmp}} as default.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-24918) Support to specify the data dir for state benchmark

2021-11-18 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-24918:


Assignee: Aitozi

> Support to specify the data dir for state benchmark 
> 
>
> Key: FLINK-24918
> URL: https://issues.apache.org/jira/browse/FLINK-24918
> Project: Flink
>  Issue Type: Improvement
>  Components: Benchmarks, Runtime / State Backends
>Reporter: Aitozi
>Assignee: Aitozi
>Priority: Minor
>
> {{StateBackendBenchmarkUtils}} use null as the parent dir to create temp dir, 
> which will finally use the /tmp as the data dir. It has two downsides:
> 1. the /tmp dir often mount with tmpfs, which may store data in memory. it 
> will affect the result of rocksdb benchmark
> 2. It can not support to use benchmark to measure the performance on a new 
> device. 
> So I purpose to enhance the state benchmark to support specify the default 
> data dir. And avoiding to use the {{/tmp}} as default.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24918) Support to specify the data dir for state benchmark

2021-11-16 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17444970#comment-17444970
 ] 

Yun Tang commented on FLINK-24918:
--

[~aitozi], I think the suggestion make sense as different disk would have 
performance impact for RocksDB like state-backend. 
Current root dir actually relays on Java's File#createTempFile, which could be 
changed with system property {{java.io.tmpdir}}. I think we could introduce new 
option to configure the root dir and give explicit descriptions in 
https://github.com/apache/flink-benchmarks documentation.
Do you want to take this ticket?

> Support to specify the data dir for state benchmark 
> 
>
> Key: FLINK-24918
> URL: https://issues.apache.org/jira/browse/FLINK-24918
> Project: Flink
>  Issue Type: Improvement
>  Components: Benchmarks, Runtime / State Backends
>Reporter: Aitozi
>Priority: Minor
>
> {{StateBackendBenchmarkUtils}} use null as the parent dir to create temp dir, 
> which will finally use the /tmp as the data dir. It has two downsides:
> 1. the /tmp dir often mount with tmpfs, which may store data in memory. it 
> will affect the result of rocksdb benchmark
> 2. It can not support to use benchmark to measure the performance on a new 
> device. 
> So I purpose to enhance the state benchmark to support specify the default 
> data dir. And avoiding to use the {{/tmp}} as default.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24932) Frocksdb cannot run on Apple M1

2021-11-16 Thread Yun Tang (Jira)
Yun Tang created FLINK-24932:


 Summary: Frocksdb cannot run on Apple M1
 Key: FLINK-24932
 URL: https://issues.apache.org/jira/browse/FLINK-24932
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Reporter: Yun Tang


After we bump up RocksDB version to 6.20.3, we support to run RocksDB on linux 
arm cluster. However, according to the feedback from Robert, Apple M1 machines 
cannot run FRocksDB yet:

{code:java}
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255)
 ~[flink-streaming-java_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
 ~[flink-streaming-java_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109)
 ~[flink-streaming-java_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
 ~[flink-streaming-java_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
 ~[flink-streaming-java_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
 ~[flink-streaming-java_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
 ~[flink-streaming-java_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
 ~[flink-runtime-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
~[flink-runtime-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) 
~[flink-runtime-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
~[flink-runtime-1.14.0.jar:1.14.0]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312]
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for StreamFlatMap_c21234bcbf1e8eb4c61f1927190efebd_(1/1) from any of 
the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
 ~[flink-streaming-java_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
 ~[flink-streaming-java_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
 ~[flink-streaming-java_2.11-1.14.0.jar:1.14.0]
... 11 more
Caused by: java.io.IOException: Could not load the native RocksDB library
at 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.ensureRocksDBIsLoaded(EmbeddedRocksDBStateBackend.java:882)
 ~[flink-statebackend-rocksdb_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:402)
 ~[flink-statebackend-rocksdb_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:345)
 ~[flink-statebackend-rocksdb_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:87)
 ~[flink-statebackend-rocksdb_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
 ~[flink-streaming-java_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
 ~[flink-streaming-java_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
 ~[flink-streaming-java_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
 ~[flink-streaming-java_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
 ~[flink-streaming-java_2.11-1.14.0.jar:1.14.0]
... 11 more
Caused by: java.lang.UnsatisfiedLinkError: 

[jira] [Commented] (FLINK-13598) frocksdb doesn't have arm release

2021-11-16 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17444959#comment-17444959
 ] 

Yun Tang commented on FLINK-13598:
--

[~rmetzger], RocksJava does not support Apple M1 yet. (see 
[rocksdb/issues/7720|https://github.com/facebook/rocksdb/issues/7720]). I think 
this would not impact the cluster usage but might make developer annoyed.
According to the issue description, I think the fix might not be so complex, 
and we could resolve this with the next upgrade of RocksDB.

> frocksdb doesn't have arm release 
> --
>
> Key: FLINK-13598
> URL: https://issues.apache.org/jira/browse/FLINK-13598
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Affects Versions: 1.9.0
>Reporter: wangxiyuan
>Assignee: Mika Naylor
>Priority: Major
> Fix For: 1.14.0
>
> Attachments: image-2020-08-20-09-22-24-021.png
>
>
> Flink now uses frocksdb which forks from rocksdb  for module 
> *flink-statebackend-rocksdb*. It doesn't contain arm release.
> Now rocksdb supports ARM from 
> [v6.2.2|https://search.maven.org/artifact/org.rocksdb/rocksdbjni/6.2.2/jar]
> Can frocksdb release an ARM package as well?
> Or AFAK, Since there were some bugs for rocksdb in the past, so that Flink 
> didn't use it directly. Have the bug been solved in rocksdb already? Can 
> Flink re-use rocksdb again now?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (FLINK-24784) Enable state.backend.latency-track.state-name-as-variable by default

2021-11-14 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-24784.
--
Resolution: Fixed

merged in master: 07d2585e9db734f080a672fd8884a1906259cfab

> Enable state.backend.latency-track.state-name-as-variable by default
> 
>
> Key: FLINK-24784
> URL: https://issues.apache.org/jira/browse/FLINK-24784
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Metrics, Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Nicholas Jiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> This could help improve the usablility of state access latency.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-24784) Enable state.backend.latency-track.state-name-as-variable by default

2021-11-14 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-24784:


Assignee: Nicholas Jiang

> Enable state.backend.latency-track.state-name-as-variable by default
> 
>
> Key: FLINK-24784
> URL: https://issues.apache.org/jira/browse/FLINK-24784
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Metrics, Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Nicholas Jiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> This could help improve the usablility of state access latency.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24846) AsyncWaitOperator fails during stop-with-savepoint

2021-11-12 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-24846:
-
Component/s: Runtime / Checkpointing

> AsyncWaitOperator fails during stop-with-savepoint
> --
>
> Key: FLINK-24846
> URL: https://issues.apache.org/jira/browse/FLINK-24846
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / Task
>Affects Versions: 1.14.0
>Reporter: Piotr Nowojski
>Priority: Critical
> Attachments: log-jm.txt
>
>
> {noformat}
> Caused by: 
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox$MailboxClosedException:
>  Mailbox is in state QUIESCED, but is required to be in state OPEN for put 
> operations.
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.checkPutStateConditions(TaskMailboxImpl.java:269)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.put(TaskMailboxImpl.java:197)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.execute(MailboxExecutorImpl.java:74)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.api.common.operators.MailboxExecutor.execute(MailboxExecutor.java:103)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.outputCompletedElement(AsyncWaitOperator.java:304)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.access$100(AsyncWaitOperator.java:78)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.processResults(AsyncWaitOperator.java:370)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.lambda$processInMailbox$0(AsyncWaitOperator.java:351)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.drain(MailboxProcessor.java:177)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:854)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:767)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) 
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) 
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at java.lang.Thread.run(Thread.java:829) ~[?:?]
> {noformat}
> As reported by a user on [the mailing 
> list:|https://mail-archives.apache.org/mod_mbox/flink-user/202111.mbox/%3CCAO6dnLwtLNxkr9qXG202ysrnse18Wgvph4hqHZe3ar8cuXAfDw%40mail.gmail.com%3E]
> {quote}
> I failed to stop a job with savepoint with the following message:
> Inconsistent execution state after stopping with savepoint. At least one 
> execution is still in one of the following states: FAILED, CANCELED. A global 
> fail-over is triggered to recover the job 452594f3ec5797f399e07f95c884a44b.
> The job manager said
>  A savepoint was created at 
> hdfs://mobdata-flink-hdfs/driving-habits/svpts/savepoint-452594-f60305755d0e 
> but the corresponding job 452594f3ec5797f399e07f95c884a44b didn't terminate 
> successfully.
> while complaining about
> Mailbox is in state QUIESCED, but is required to be in state OPEN for put 
> operations.
> Is it okay to ignore this kind of error?
> Please see the attached files for the detailed context.
> FYI, 
> - I used the latest 1.14.0
> - I started the job with "$FLINK_HOME"/bin/flink run --target yarn-per-job
> - I couldn't reproduce the exception using the same jar so I might not able 
> to provide DUBUG messages
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24852) Cleanup of Orphaned Incremental State Artifacts

2021-11-12 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17442845#comment-17442845
 ] 

Yun Tang commented on FLINK-24852:
--

I think the suggestion to use the "previous+1"  checkpoint id should be oaky 
for changelog case to avoid unexpected deletion.

I think cleaning folders with different job id is very risky. We ever create 
another standalone tool outside the Flink itself, which would analysis the 
{{_metadata}} and delete files not in the {{_metadata}} and older than the 
timestamp created by the last successful checkpoint. It might not be wise for a 
community to offer such a tool as different DFS might not obey the rule that 
still-in-use files having larger timestamp than last complete checkpoint due to 
unexpected reason. Moreover, we could also face the case that a larger 
checkpoint-id {{_metadata}} with older timestamp (maybe caused by the fixed job 
id with different runs), which would make the cleaning logic very complex and 
easy to delete files by mistake.



> Cleanup of Orphaned Incremental State Artifacts
> ---
>
> Key: FLINK-24852
> URL: https://issues.apache.org/jira/browse/FLINK-24852
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.14.0
>Reporter: Stephan Ewen
>Priority: Major
>
> Shared State Artifacts (state files in the "shared" folder in the DFS / 
> ObjectStore) can become orphaned in various situations:
> * When a TaskManager fails right after it created a state file but before the 
> checkpoint was ack-ed to the JobManager, that state file will be orphaned.
> * When the JobManager fails all state newly added for the currently pending 
> checkpoint will be orphaned.
> These state artifacts are currently impossible to be cleaned up manually, 
> because it isn't easily possible to understand whether they are still being 
> used (referenced by any checkpoint).
> We should introduce a "garbage collector" that identifies and deletes such 
> orphaned state artifacts.
> h2. Idea for a cleanup mechanism
> A periodic cleanup thread would periodically execute a cleanup procedure that 
> searches for and deletes the orphaned artifacts.
> To identify those artifacts, the cleanup procedure needs the following inputs:
> * The oldest retained checkpoint ID
> * A snapshot of the shared state registry
> * A way to identify for each state artifact from which checkpoint it was 
> created.
> The cleanup procedure would
> * enumerate all state artifacts (for example files in the "shared" directory)
> * For each one check whether it was created earlier than the oldest retained 
> checkpoint. If not, that artifact would be skipped, because it might come 
> from a later pending checkpoint, or later canceled checkpoint.
> * Finally, the procedure checks if the state artifact is known by the shared 
> state registry. If yes, the artifact is kept, if not, it is orphaned and will 
> be deleted.
> Because the cleanup procedure is specific to the checkpoint storage, it 
> should probably be instantiated from the checkpoint storage.
> To make it possible to identify the checkpoint for which a state artifact was 
> created, we can put that checkpoint ID into the state file name, for example 
> format the state name as {{"_"}}.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24841) ChainOrderTest.testMigrationAndRestore fails on AZP

2021-11-12 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-24841:
-
Component/s: Runtime / Checkpointing
 (was: Runtime / State Backends)

> ChainOrderTest.testMigrationAndRestore fails on AZP
> ---
>
> Key: FLINK-24841
> URL: https://issues.apache.org/jira/browse/FLINK-24841
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Runtime / Checkpointing
>Affects Versions: 1.15.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.15.0
>
>
> The test {{ChainOrderTest.testMigrationAndRestore}} fails on AZP with
> {code}
> 2021-11-09T00:07:04.9969557Z Nov 09 00:07:04 [ERROR] Tests run: 12, Failures: 
> 0, Errors: 1, Skipped: 0, Time elapsed: 97.296 s <<< FAILURE! - in 
> org.apache.flink.test.state.operator.restore.unkeyed.ChainOrderTest
> 2021-11-09T00:07:04.9970349Z Nov 09 00:07:04 [ERROR] 
> testMigrationAndRestore[Migrate Savepoint: 1.5]  Time elapsed: 73.544 s  <<< 
> ERROR!
> 2021-11-09T00:07:04.9984239Z Nov 09 00:07:04 
> java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Job 
> 2f4599006c2734303b1d6033f69844e9 is not a streaming job.
> 2021-11-09T00:07:04.9985462Z Nov 09 00:07:04  at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
> 2021-11-09T00:07:04.9986224Z Nov 09 00:07:04  at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
> 2021-11-09T00:07:04.9986910Z Nov 09 00:07:04  at 
> org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase.migrateJob(AbstractOperatorRestoreTestBase.java:160)
> 2021-11-09T00:07:04.9987650Z Nov 09 00:07:04  at 
> org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase.testMigrationAndRestore(AbstractOperatorRestoreTestBase.java:114)
> 2021-11-09T00:07:04.9988579Z Nov 09 00:07:04  at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2021-11-09T00:07:04.9992663Z Nov 09 00:07:04  at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2021-11-09T00:07:04.9993717Z Nov 09 00:07:04  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2021-11-09T00:07:04.9994648Z Nov 09 00:07:04  at 
> java.base/java.lang.reflect.Method.invoke(Method.java:566)
> 2021-11-09T00:07:04.9995423Z Nov 09 00:07:04  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 2021-11-09T00:07:04.9996143Z Nov 09 00:07:04  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2021-11-09T00:07:04.9996674Z Nov 09 00:07:04  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 2021-11-09T00:07:04.9997227Z Nov 09 00:07:04  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2021-11-09T00:07:04.9998428Z Nov 09 00:07:04  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> 2021-11-09T00:07:04.9998938Z Nov 09 00:07:04  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> 2021-11-09T00:07:04.462Z Nov 09 00:07:04  at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> 2021-11-09T00:07:04.946Z Nov 09 00:07:04  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> 2021-11-09T00:07:05.391Z Nov 09 00:07:04  at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> 2021-11-09T00:07:05.900Z Nov 09 00:07:04  at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> 2021-11-09T00:07:05.0001404Z Nov 09 00:07:04  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> 2021-11-09T00:07:05.0001897Z Nov 09 00:07:04  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> 2021-11-09T00:07:05.0002437Z Nov 09 00:07:04  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> 2021-11-09T00:07:05.0002914Z Nov 09 00:07:04  at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> 2021-11-09T00:07:05.0003377Z Nov 09 00:07:04  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> 2021-11-09T00:07:05.0003846Z Nov 09 00:07:04  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> 2021-11-09T00:07:05.0004393Z Nov 09 00:07:04  at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> 2021-11-09T00:07:05.0004855Z Nov 09 00:07:04  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> 2021-11-09T00:07:05.0005310Z Nov 09 00:07:04  at 
> 

[jira] [Commented] (FLINK-24046) Refactor the relationship bwtween PredefinedOptions and RocksDBConfigurableOptions

2021-11-10 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17441720#comment-17441720
 ] 

Yun Tang commented on FLINK-24046:
--

As discussed offline, I think we should still keep the 
{{DefaultConfigurableOptionsFactory}} but deprecate it, and make 
{{state.backend.rocksdb.options-factory}} no default value so that Flink could 
tell users that {{DefaultConfigurableOptionsFactory}} has been deprecated if 
detected that {{DefaultConfigurableOptionsFactory}} configured by user 
explicitly.

> Refactor the relationship bwtween PredefinedOptions and 
> RocksDBConfigurableOptions
> --
>
> Key: FLINK-24046
> URL: https://issues.apache.org/jira/browse/FLINK-24046
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Zakelly Lan
>Priority: Major
> Fix For: 1.14.1
>
>
> RocksDBConfigurableOptions mainly focus on the settings of DBOptions and 
> ColumnFamilyOptions. The original design of this class is used to let user 
> could configure RocksDB via configurations instead of programmatically 
> implemented RocksDBOptionsFactory.
> To make the minimal change, original options in RocksDBConfigurableOptions 
> have no default value so that we would not make anything happen in 
> DefaultConfigurableOptionsFactory just as before.
> However, this make user not so clear of the option meaning with no default 
> value, and we could consider change the relationship between them.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


<    3   4   5   6   7   8   9   10   11   12   >