[jira] [Commented] (FLINK-32086) Cleanup non-reported managed directory on exit of TM
[ https://issues.apache.org/jira/browse/FLINK-32086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839917#comment-17839917 ] xiaogang zhou commented on FLINK-32086: --- [~Zakelly] yes no problem > Cleanup non-reported managed directory on exit of TM > > > Key: FLINK-32086 > URL: https://issues.apache.org/jira/browse/FLINK-32086 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing, Runtime / State Backends >Affects Versions: 1.18.0 >Reporter: Zakelly Lan >Assignee: Zakelly Lan >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32070) FLIP-306 Unified File Merging Mechanism for Checkpoints
[ https://issues.apache.org/jira/browse/FLINK-32070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834636#comment-17834636 ] xiaogang zhou commented on FLINK-32070: --- [~Zakelly] yes, sounds good, Let me take a look > FLIP-306 Unified File Merging Mechanism for Checkpoints > --- > > Key: FLINK-32070 > URL: https://issues.apache.org/jira/browse/FLINK-32070 > Project: Flink > Issue Type: New Feature > Components: Runtime / Checkpointing, Runtime / State Backends >Reporter: Zakelly Lan >Assignee: Zakelly Lan >Priority: Major > Fix For: 1.20.0 > > > The FLIP: > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints] > > The creation of multiple checkpoint files can lead to a 'file flood' problem, > in which a large number of files are written to the checkpoint storage in a > short amount of time. This can cause issues in large clusters with high > workloads, such as the creation and deletion of many files increasing the > amount of file meta modification on DFS, leading to single-machine hotspot > issues for meta maintainers (e.g. NameNode in HDFS). Additionally, the > performance of object storage (e.g. Amazon S3 and Alibaba OSS) can > significantly decrease when listing objects, which is necessary for object > name de-duplication before creating an object, further affecting the > performance of directory manipulation in the file system's perspective of > view (See [hadoop-aws module > documentation|https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#:~:text=an%20intermediate%20state.-,Warning%20%232%3A%20Directories%20are%20mimicked,-The%20S3A%20clients], > section 'Warning #2: Directories are mimicked'). > While many solutions have been proposed for individual types of state files > (e.g. FLINK-11937 for keyed state (RocksDB) and FLINK-26803 for channel > state), the file flood problems from each type of checkpoint file are similar > and lack systematic view and solution. Therefore, the goal of this FLIP is to > establish a unified file merging mechanism to address the file flood problem > during checkpoint creation for all types of state files, including keyed, > non-keyed, channel, and changelog state. This will significantly improve the > system stability and availability of fault tolerance in Flink. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-32070) FLIP-306 Unified File Merging Mechanism for Checkpoints
[ https://issues.apache.org/jira/browse/FLINK-32070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834609#comment-17834609 ] xiaogang zhou edited comment on FLINK-32070 at 4/7/24 6:36 AM: --- Is there any Branch I can compile to do a POC? And I think if you are busy on flink 2.0 state, I can also help do some work on this FLIP-306?[~Zakelly] was (Author: zhoujira86): Is there any Branch I can view do a POC? And I think if you are busy on flink 2.0 state, I can also help do some work on this issue?[~Zakelly] > FLIP-306 Unified File Merging Mechanism for Checkpoints > --- > > Key: FLINK-32070 > URL: https://issues.apache.org/jira/browse/FLINK-32070 > Project: Flink > Issue Type: New Feature > Components: Runtime / Checkpointing, Runtime / State Backends >Reporter: Zakelly Lan >Assignee: Zakelly Lan >Priority: Major > Fix For: 1.20.0 > > > The FLIP: > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints] > > The creation of multiple checkpoint files can lead to a 'file flood' problem, > in which a large number of files are written to the checkpoint storage in a > short amount of time. This can cause issues in large clusters with high > workloads, such as the creation and deletion of many files increasing the > amount of file meta modification on DFS, leading to single-machine hotspot > issues for meta maintainers (e.g. NameNode in HDFS). Additionally, the > performance of object storage (e.g. Amazon S3 and Alibaba OSS) can > significantly decrease when listing objects, which is necessary for object > name de-duplication before creating an object, further affecting the > performance of directory manipulation in the file system's perspective of > view (See [hadoop-aws module > documentation|https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#:~:text=an%20intermediate%20state.-,Warning%20%232%3A%20Directories%20are%20mimicked,-The%20S3A%20clients], > section 'Warning #2: Directories are mimicked'). > While many solutions have been proposed for individual types of state files > (e.g. FLINK-11937 for keyed state (RocksDB) and FLINK-26803 for channel > state), the file flood problems from each type of checkpoint file are similar > and lack systematic view and solution. Therefore, the goal of this FLIP is to > establish a unified file merging mechanism to address the file flood problem > during checkpoint creation for all types of state files, including keyed, > non-keyed, channel, and changelog state. This will significantly improve the > system stability and availability of fault tolerance in Flink. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32070) FLIP-306 Unified File Merging Mechanism for Checkpoints
[ https://issues.apache.org/jira/browse/FLINK-32070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834609#comment-17834609 ] xiaogang zhou commented on FLINK-32070: --- Is there any Branch I can view do a POC? And I think if you are busy on flink 2.0 state, I can also help do some work on this issue?[~Zakelly] > FLIP-306 Unified File Merging Mechanism for Checkpoints > --- > > Key: FLINK-32070 > URL: https://issues.apache.org/jira/browse/FLINK-32070 > Project: Flink > Issue Type: New Feature > Components: Runtime / Checkpointing, Runtime / State Backends >Reporter: Zakelly Lan >Assignee: Zakelly Lan >Priority: Major > Fix For: 1.20.0 > > > The FLIP: > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints] > > The creation of multiple checkpoint files can lead to a 'file flood' problem, > in which a large number of files are written to the checkpoint storage in a > short amount of time. This can cause issues in large clusters with high > workloads, such as the creation and deletion of many files increasing the > amount of file meta modification on DFS, leading to single-machine hotspot > issues for meta maintainers (e.g. NameNode in HDFS). Additionally, the > performance of object storage (e.g. Amazon S3 and Alibaba OSS) can > significantly decrease when listing objects, which is necessary for object > name de-duplication before creating an object, further affecting the > performance of directory manipulation in the file system's perspective of > view (See [hadoop-aws module > documentation|https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#:~:text=an%20intermediate%20state.-,Warning%20%232%3A%20Directories%20are%20mimicked,-The%20S3A%20clients], > section 'Warning #2: Directories are mimicked'). > While many solutions have been proposed for individual types of state files > (e.g. FLINK-11937 for keyed state (RocksDB) and FLINK-26803 for channel > state), the file flood problems from each type of checkpoint file are similar > and lack systematic view and solution. Therefore, the goal of this FLIP is to > establish a unified file merging mechanism to address the file flood problem > during checkpoint creation for all types of state files, including keyed, > non-keyed, channel, and changelog state. This will significantly improve the > system stability and availability of fault tolerance in Flink. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32070) FLIP-306 Unified File Merging Mechanism for Checkpoints
[ https://issues.apache.org/jira/browse/FLINK-32070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834580#comment-17834580 ] xiaogang zhou commented on FLINK-32070: --- [~Zakelly] Hi, we met a problem that FLINK checkpoint has too many sst files will cause great IOPS on HDFS. Can this issue help on that scenario? > FLIP-306 Unified File Merging Mechanism for Checkpoints > --- > > Key: FLINK-32070 > URL: https://issues.apache.org/jira/browse/FLINK-32070 > Project: Flink > Issue Type: New Feature > Components: Runtime / Checkpointing, Runtime / State Backends >Reporter: Zakelly Lan >Assignee: Zakelly Lan >Priority: Major > Fix For: 1.20.0 > > > The FLIP: > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints] > > The creation of multiple checkpoint files can lead to a 'file flood' problem, > in which a large number of files are written to the checkpoint storage in a > short amount of time. This can cause issues in large clusters with high > workloads, such as the creation and deletion of many files increasing the > amount of file meta modification on DFS, leading to single-machine hotspot > issues for meta maintainers (e.g. NameNode in HDFS). Additionally, the > performance of object storage (e.g. Amazon S3 and Alibaba OSS) can > significantly decrease when listing objects, which is necessary for object > name de-duplication before creating an object, further affecting the > performance of directory manipulation in the file system's perspective of > view (See [hadoop-aws module > documentation|https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#:~:text=an%20intermediate%20state.-,Warning%20%232%3A%20Directories%20are%20mimicked,-The%20S3A%20clients], > section 'Warning #2: Directories are mimicked'). > While many solutions have been proposed for individual types of state files > (e.g. FLINK-11937 for keyed state (RocksDB) and FLINK-26803 for channel > state), the file flood problems from each type of checkpoint file are similar > and lack systematic view and solution. Therefore, the goal of this FLIP is to > establish a unified file merging mechanism to address the file flood problem > during checkpoint creation for all types of state files, including keyed, > non-keyed, channel, and changelog state. This will significantly improve the > system stability and availability of fault tolerance in Flink. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34976) LD_PRELOAD environment may not be effective after su to flink user
[ https://issues.apache.org/jira/browse/FLINK-34976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17832762#comment-17832762 ] xiaogang zhou commented on FLINK-34976: --- [~yunta] Ok, Thanks for clarification > LD_PRELOAD environment may not be effective after su to flink user > -- > > Key: FLINK-34976 > URL: https://issues.apache.org/jira/browse/FLINK-34976 > Project: Flink > Issue Type: New Feature > Components: flink-docker >Affects Versions: 1.19.0 >Reporter: xiaogang zhou >Priority: Major > > I am not sure if LD_PRELOAD still takes effect after drop_privs_cmd. Should > we create a .bashrc file in home directory of flink, and export LD_PRELOAD > for flink user? > > [https://github.com/apache/flink-docker/blob/627987997ca7ec86bcc3d80b26df58aa595b91af/1.17/scala_2.12-java11-ubuntu/docker-entrypoint.sh#L92] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-34976) LD_PRELOAD environment may not be effective after su to flink user
[ https://issues.apache.org/jira/browse/FLINK-34976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou closed FLINK-34976. - Resolution: Invalid > LD_PRELOAD environment may not be effective after su to flink user > -- > > Key: FLINK-34976 > URL: https://issues.apache.org/jira/browse/FLINK-34976 > Project: Flink > Issue Type: New Feature > Components: flink-docker >Affects Versions: 1.19.0 >Reporter: xiaogang zhou >Priority: Major > > I am not sure if LD_PRELOAD still takes effect after drop_privs_cmd. Should > we create a .bashrc file in home directory of flink, and export LD_PRELOAD > for flink user? > > [https://github.com/apache/flink-docker/blob/627987997ca7ec86bcc3d80b26df58aa595b91af/1.17/scala_2.12-java11-ubuntu/docker-entrypoint.sh#L92] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34976) LD_PRELOAD environment may not be effective after su to flink user
[ https://issues.apache.org/jira/browse/FLINK-34976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-34976: -- Description: I am not sure if LD_PRELOAD still takes effect after drop_privs_cmd. Do we need to create a .bashrc file in home directory of flink, and export LD_PRELOAD for flink user? [https://github.com/apache/flink-docker/blob/627987997ca7ec86bcc3d80b26df58aa595b91af/1.17/scala_2.12-java11-ubuntu/docker-entrypoint.sh#L92] was: I am not sure if LD_PRELOAD still takes effect after drop_privs_cmd. Do we need to create a .bashrc file in home directory of flink [https://github.com/apache/flink-docker/blob/627987997ca7ec86bcc3d80b26df58aa595b91af/1.17/scala_2.12-java11-ubuntu/docker-entrypoint.sh#L92] > LD_PRELOAD environment may not be effective after su to flink user > -- > > Key: FLINK-34976 > URL: https://issues.apache.org/jira/browse/FLINK-34976 > Project: Flink > Issue Type: New Feature > Components: flink-docker >Affects Versions: 1.19.0 >Reporter: xiaogang zhou >Priority: Major > > I am not sure if LD_PRELOAD still takes effect after drop_privs_cmd. Do we > need to create a .bashrc file in home directory of flink, and export > LD_PRELOAD for flink user? > > [https://github.com/apache/flink-docker/blob/627987997ca7ec86bcc3d80b26df58aa595b91af/1.17/scala_2.12-java11-ubuntu/docker-entrypoint.sh#L92] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34976) LD_PRELOAD environment may not be effective after su to flink user
[ https://issues.apache.org/jira/browse/FLINK-34976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-34976: -- Description: I am not sure if LD_PRELOAD still takes effect after drop_privs_cmd. Should we create a .bashrc file in home directory of flink, and export LD_PRELOAD for flink user? [https://github.com/apache/flink-docker/blob/627987997ca7ec86bcc3d80b26df58aa595b91af/1.17/scala_2.12-java11-ubuntu/docker-entrypoint.sh#L92] was: I am not sure if LD_PRELOAD still takes effect after drop_privs_cmd. Do we need to create a .bashrc file in home directory of flink, and export LD_PRELOAD for flink user? [https://github.com/apache/flink-docker/blob/627987997ca7ec86bcc3d80b26df58aa595b91af/1.17/scala_2.12-java11-ubuntu/docker-entrypoint.sh#L92] > LD_PRELOAD environment may not be effective after su to flink user > -- > > Key: FLINK-34976 > URL: https://issues.apache.org/jira/browse/FLINK-34976 > Project: Flink > Issue Type: New Feature > Components: flink-docker >Affects Versions: 1.19.0 >Reporter: xiaogang zhou >Priority: Major > > I am not sure if LD_PRELOAD still takes effect after drop_privs_cmd. Should > we create a .bashrc file in home directory of flink, and export LD_PRELOAD > for flink user? > > [https://github.com/apache/flink-docker/blob/627987997ca7ec86bcc3d80b26df58aa595b91af/1.17/scala_2.12-java11-ubuntu/docker-entrypoint.sh#L92] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34976) LD_PRELOAD environment may not be effective after su to flink user
[ https://issues.apache.org/jira/browse/FLINK-34976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-34976: -- Description: I am not sure if LD_PRELOAD still takes effect after drop_privs_cmd. Do we need to create a .bashrc file in home directory of flink [https://github.com/apache/flink-docker/blob/627987997ca7ec86bcc3d80b26df58aa595b91af/1.17/scala_2.12-java11-ubuntu/docker-entrypoint.sh#L92] > LD_PRELOAD environment may not be effective after su to flink user > -- > > Key: FLINK-34976 > URL: https://issues.apache.org/jira/browse/FLINK-34976 > Project: Flink > Issue Type: New Feature > Components: flink-docker >Affects Versions: 1.19.0 >Reporter: xiaogang zhou >Priority: Major > > I am not sure if LD_PRELOAD still takes effect after drop_privs_cmd. Do we > need to create a .bashrc file in home directory of flink > > [https://github.com/apache/flink-docker/blob/627987997ca7ec86bcc3d80b26df58aa595b91af/1.17/scala_2.12-java11-ubuntu/docker-entrypoint.sh#L92] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34976) LD_PRELOAD environment may not be effective after su to flink user
xiaogang zhou created FLINK-34976: - Summary: LD_PRELOAD environment may not be effective after su to flink user Key: FLINK-34976 URL: https://issues.apache.org/jira/browse/FLINK-34976 Project: Flink Issue Type: New Feature Components: flink-docker Affects Versions: 1.19.0 Reporter: xiaogang zhou -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-17593) Support arbitrary recovery mechanism for PartFileWriter
[ https://issues.apache.org/jira/browse/FLINK-17593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17814195#comment-17814195 ] xiaogang zhou commented on FLINK-17593: --- [~maguowei] Hi , for filesystem which does not support recoverable writer(like oss filesystem), looks like we can not use the filesystem connector? > Support arbitrary recovery mechanism for PartFileWriter > --- > > Key: FLINK-17593 > URL: https://issues.apache.org/jira/browse/FLINK-17593 > Project: Flink > Issue Type: Sub-task > Components: Connectors / FileSystem >Reporter: Yun Gao >Assignee: Guowei Ma >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > > Currently Bucket relies directly on _RecoverableOutputStream_ provided by > FileSystem to achieve snapshotting and recovery the in-progress part file for > all the PartFileWriter implementations. This would require that the > PartFileWriter must be based on the OutputStream. > To support the path-based PartFileWriter required by the Hive Sink, we will > first need to abstract the snapshotting mechanism of the PartFileWriter and > make RecoverableOutputStream to be one type of implementation, thus we could > decouple PartFileWriter with the output streams. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33728) Do not rewatch when KubernetesResourceManagerDriver watch fail
[ https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-33728: -- Summary: Do not rewatch when KubernetesResourceManagerDriver watch fail (was: do not rewatch when KubernetesResourceManagerDriver watch fail) > Do not rewatch when KubernetesResourceManagerDriver watch fail > -- > > Key: FLINK-33728 > URL: https://issues.apache.org/jira/browse/FLINK-33728 > Project: Flink > Issue Type: New Feature > Components: Deployment / Kubernetes >Reporter: xiaogang zhou >Assignee: xiaogang zhou >Priority: Major > Labels: pull-request-available > > I met massive production problem when kubernetes ETCD slow responding happen. > After Kube recoverd after 1 hour, Thousands of Flink jobs using > kubernetesResourceManagerDriver rewatched when recieving > ResourceVersionTooOld, which caused great pressure on API Server and made > API server failed again... > > I am not sure is it necessary to > getResourceEventHandler().onError(throwable) > in PodCallbackHandlerImpl# handleError method? > > We can just neglect the disconnection of watching process. and try to rewatch > once new requestResource called. And we can leverage on the akka heartbeat > timeout to discover the TM failure, just like YARN mode do. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail
[ https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17806640#comment-17806640 ] xiaogang zhou commented on FLINK-33728: --- [~xtsong] [~wangyang0918] Ok, glad to hear that. Would you please help assign the ticket to me? > do not rewatch when KubernetesResourceManagerDriver watch fail > -- > > Key: FLINK-33728 > URL: https://issues.apache.org/jira/browse/FLINK-33728 > Project: Flink > Issue Type: New Feature > Components: Deployment / Kubernetes >Reporter: xiaogang zhou >Priority: Major > Labels: pull-request-available > > I met massive production problem when kubernetes ETCD slow responding happen. > After Kube recoverd after 1 hour, Thousands of Flink jobs using > kubernetesResourceManagerDriver rewatched when recieving > ResourceVersionTooOld, which caused great pressure on API Server and made > API server failed again... > > I am not sure is it necessary to > getResourceEventHandler().onError(throwable) > in PodCallbackHandlerImpl# handleError method? > > We can just neglect the disconnection of watching process. and try to rewatch > once new requestResource called. And we can leverage on the akka heartbeat > timeout to discover the TM failure, just like YARN mode do. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33741) Expose Rocksdb Histogram statistics in Flink metrics
[ https://issues.apache.org/jira/browse/FLINK-33741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-33741: -- Summary: Expose Rocksdb Histogram statistics in Flink metrics (was: Exposed Rocksdb Histogram statistics in Flink metrics ) > Expose Rocksdb Histogram statistics in Flink metrics > - > > Key: FLINK-33741 > URL: https://issues.apache.org/jira/browse/FLINK-33741 > Project: Flink > Issue Type: New Feature >Reporter: xiaogang zhou >Assignee: xiaogang zhou >Priority: Major > > I'd like to expose ROCKSDB Histogram metrics like db_get db_write to enable > trouble shooting -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33741) Exposed Rocksdb Histogram statistics in Flink metrics
[ https://issues.apache.org/jira/browse/FLINK-33741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-33741: -- Description: I'd like to expose ROCKSDB Histogram metrics like db_get db_write to enable trouble shooting (was: I think we can also parse the multi-line string of the rocksdb statistics. {code:java} // code placeholder /** * DB implements can export properties about their state * via this method on a per column family level. * * If {@code property} is a valid property understood by this DB * implementation, fills {@code value} with its current value and * returns true. Otherwise returns false. * * Valid property names include: * * "rocksdb.num-files-at-level" - return the number of files at * level , where is an ASCII representation of a level * number (e.g. "0"). * "rocksdb.stats" - returns a multi-line string that describes statistics * about the internal operation of the DB. * "rocksdb.sstables" - returns a multi-line string that describes all *of the sstables that make up the db contents. * * * @param columnFamilyHandle {@link org.rocksdb.ColumnFamilyHandle} * instance, or null for the default column family. * @param property to be fetched. See above for examples * @return property value * * @throws RocksDBException thrown if error happens in underlying *native library. */ public String getProperty( /* @Nullable */ final ColumnFamilyHandle columnFamilyHandle, final String property) throws RocksDBException { {code} Then we can directly export these rt latency number in metrics. I'd like to introduce 2 rocksdb statistic related configuration. Then we can customize stats {code:java} // code placeholder Statistics s = new Statistics(); s.setStatsLevel(EXCEPT_TIME_FOR_MUTEX); currentOptions.setStatsDumpPeriodSec(internalGetOption(RocksDBConfigurableOptions.STATISTIC_DUMP_PERIOD)) .setStatistics(s); {code}) > Exposed Rocksdb Histogram statistics in Flink metrics > -- > > Key: FLINK-33741 > URL: https://issues.apache.org/jira/browse/FLINK-33741 > Project: Flink > Issue Type: New Feature >Reporter: xiaogang zhou >Assignee: xiaogang zhou >Priority: Major > > I'd like to expose ROCKSDB Histogram metrics like db_get db_write to enable > trouble shooting -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33741) Exposed Rocksdb Histogram statistics in Flink metrics
[ https://issues.apache.org/jira/browse/FLINK-33741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-33741: -- Summary: Exposed Rocksdb Histogram statistics in Flink metrics (was: Exposed Rocksdb statistics in Flink metrics and introduce 2 Rocksdb statistic related configuration) > Exposed Rocksdb Histogram statistics in Flink metrics > -- > > Key: FLINK-33741 > URL: https://issues.apache.org/jira/browse/FLINK-33741 > Project: Flink > Issue Type: New Feature >Reporter: xiaogang zhou >Assignee: xiaogang zhou >Priority: Major > > I think we can also parse the multi-line string of the rocksdb statistics. > {code:java} > // code placeholder > /** > * DB implements can export properties about their state > * via this method on a per column family level. > * > * If {@code property} is a valid property understood by this DB > * implementation, fills {@code value} with its current value and > * returns true. Otherwise returns false. > * > * Valid property names include: > * > * "rocksdb.num-files-at-level" - return the number of files at > * level , where is an ASCII representation of a level > * number (e.g. "0"). > * "rocksdb.stats" - returns a multi-line string that describes statistics > * about the internal operation of the DB. > * "rocksdb.sstables" - returns a multi-line string that describes all > *of the sstables that make up the db contents. > * > * > * @param columnFamilyHandle {@link org.rocksdb.ColumnFamilyHandle} > * instance, or null for the default column family. > * @param property to be fetched. See above for examples > * @return property value > * > * @throws RocksDBException thrown if error happens in underlying > *native library. > */ > public String getProperty( > /* @Nullable */ final ColumnFamilyHandle columnFamilyHandle, > final String property) throws RocksDBException { {code} > > Then we can directly export these rt latency number in metrics. > > I'd like to introduce 2 rocksdb statistic related configuration. > Then we can customize stats > {code:java} > // code placeholder > Statistics s = new Statistics(); > s.setStatsLevel(EXCEPT_TIME_FOR_MUTEX); > currentOptions.setStatsDumpPeriodSec(internalGetOption(RocksDBConfigurableOptions.STATISTIC_DUMP_PERIOD)) > .setStatistics(s); {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail
[ https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17804638#comment-17804638 ] xiaogang zhou edited comment on FLINK-33728 at 1/9/24 8:53 AM: --- [~xtsong] In a default FLINK setting, when the KubenetesClient disconnects from KUBE API server, it will try to reconnect for infinitely times. As kubernetes.watch.reconnectLimit is -1. But KubenetesClient treat ResourceVersionTooOld as a special exception, as it will escape from the normal reconnects. And then it will cause FLINK FlinkKubeClient to retry connect for kubernetes.transactional-operation.max-retries times, and these retries have not interval between them. If the watcher does not recover, the JM will kill it self. So I think the problem we are trying to solve is not only to avoid massive Flink jobs trying to re-creating watches at the same time. But also how to allow FLINK to continue running even when the KUBE API SERVER is in a disorder situation. As for most of the times, FLINK TMs have no dependency on API SERVER. If you think it is not acceptable to recover the watcher only requesting resource, I think another possible way is , we can retry to rewatch pods periodically. WDYT? :) was (Author: zhoujira86): [~xtsong] In a default FLINK setting, when the KubenetesClient disconnects from KUBE API server, it will try to reconnect for infinitely times. As kubernetes.watch.reconnectLimit is -1. But KubenetesClient treat ResourceVersionTooOld as a special exception, as it will escape from the normal reconnects. And then it will cause FLINK FlinkKubeClient to retry connect for kubernetes.transactional-operation.max-retries times, and these retries have not interval between them. If the watcher does not recover, the JM will kill it self. So I think the problem we are trying to solve is not only to avoid massive Flink jobs trying to re-creating watches at the same time. But also how to allow FLINK to continue running even when the KUBE API is in a disorder situation. As for most of the times, FLINK TMs do not need to be bothered by a bad API server . If you think it is not acceptable to recover the watcher only requesting resource, I think another possible way is , we can retry to rewatch pods periodically. WDYT? :) > do not rewatch when KubernetesResourceManagerDriver watch fail > -- > > Key: FLINK-33728 > URL: https://issues.apache.org/jira/browse/FLINK-33728 > Project: Flink > Issue Type: New Feature > Components: Deployment / Kubernetes >Reporter: xiaogang zhou >Priority: Major > Labels: pull-request-available > > I met massive production problem when kubernetes ETCD slow responding happen. > After Kube recoverd after 1 hour, Thousands of Flink jobs using > kubernetesResourceManagerDriver rewatched when recieving > ResourceVersionTooOld, which caused great pressure on API Server and made > API server failed again... > > I am not sure is it necessary to > getResourceEventHandler().onError(throwable) > in PodCallbackHandlerImpl# handleError method? > > We can just neglect the disconnection of watching process. and try to rewatch > once new requestResource called. And we can leverage on the akka heartbeat > timeout to discover the TM failure, just like YARN mode do. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail
[ https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17804638#comment-17804638 ] xiaogang zhou edited comment on FLINK-33728 at 1/9/24 8:52 AM: --- [~xtsong] In a default FLINK setting, when the KubenetesClient disconnects from KUBE API server, it will try to reconnect for infinitely times. As kubernetes.watch.reconnectLimit is -1. But KubenetesClient treat ResourceVersionTooOld as a special exception, as it will escape from the normal reconnects. And then it will cause FLINK FlinkKubeClient to retry connect for kubernetes.transactional-operation.max-retries times, and these retries have not interval between them. If the watcher does not recover, the JM will kill it self. So I think the problem we are trying to solve is not only to avoid massive Flink jobs trying to re-creating watches at the same time. But also how to allow FLINK to continue running even when the KUBE API is in a disorder situation. As for most of the times, FLINK TMs do not need to be bothered by a bad API server . If you think it is not acceptable to recover the watcher only requesting resource, I think another possible way is , we can retry to rewatch pods periodically. WDYT? :) was (Author: zhoujira86): [~xtsong] In a default FLINK setting, when the KubenetesClient disconnects from KUBE API server, it will try to reconnect for infinitely times. As kubernetes.watch.reconnectLimit is -1. But KubenetesClient treat ResourceVersionTooOld as a special exception, as it will escape from the normal reconnects. And then it will cause FLINK FlinkKubeClient to retry connect for kubernetes.transactional-operation.max-retries times. If the watcher does not recover, the JM will kill it self. So I think the problem we are trying to solve is not only to avoid massive Flink jobs trying to re-creating watches at the same time. But also how to allow FLINK to continue running even when the KUBE API is in a disorder situation. As for most of the times, FLINK TMs do not need to be bothered by a bad API server . If you think it is not acceptable to recover the watcher only requesting resource, I think another possible way is , we can retry to rewatch pods periodically. WDYT? :) > do not rewatch when KubernetesResourceManagerDriver watch fail > -- > > Key: FLINK-33728 > URL: https://issues.apache.org/jira/browse/FLINK-33728 > Project: Flink > Issue Type: New Feature > Components: Deployment / Kubernetes >Reporter: xiaogang zhou >Priority: Major > Labels: pull-request-available > > I met massive production problem when kubernetes ETCD slow responding happen. > After Kube recoverd after 1 hour, Thousands of Flink jobs using > kubernetesResourceManagerDriver rewatched when recieving > ResourceVersionTooOld, which caused great pressure on API Server and made > API server failed again... > > I am not sure is it necessary to > getResourceEventHandler().onError(throwable) > in PodCallbackHandlerImpl# handleError method? > > We can just neglect the disconnection of watching process. and try to rewatch > once new requestResource called. And we can leverage on the akka heartbeat > timeout to discover the TM failure, just like YARN mode do. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail
[ https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17804638#comment-17804638 ] xiaogang zhou commented on FLINK-33728: --- [~xtsong] In a default FLINK setting, when the KubenetesClient disconnects from KUBE API server, it will try to reconnect for infinitely times. As kubernetes.watch.reconnectLimit is -1. But KubenetesClient treat ResourceVersionTooOld as a special exception, as it will escape from the normal reconnects. And then it will cause FLINK FlinkKubeClient to retry connect for kubernetes.transactional-operation.max-retries times. If the watcher does not recover, the JM will kill it self. So I think the problem we are trying to solve is not only to avoid massive Flink jobs trying to re-creating watches at the same time. But also how to allow FLINK to continue running even when the KUBE API is in a disorder situation. As for most of the times, FLINK TMs do not need to be bothered by a bad API server . If you think it is not acceptable to recover the watcher only requesting resource, I think another possible way is , we can retry to rewatch pods periodically. WDYT? :) > do not rewatch when KubernetesResourceManagerDriver watch fail > -- > > Key: FLINK-33728 > URL: https://issues.apache.org/jira/browse/FLINK-33728 > Project: Flink > Issue Type: New Feature > Components: Deployment / Kubernetes >Reporter: xiaogang zhou >Priority: Major > Labels: pull-request-available > > I met massive production problem when kubernetes ETCD slow responding happen. > After Kube recoverd after 1 hour, Thousands of Flink jobs using > kubernetesResourceManagerDriver rewatched when recieving > ResourceVersionTooOld, which caused great pressure on API Server and made > API server failed again... > > I am not sure is it necessary to > getResourceEventHandler().onError(throwable) > in PodCallbackHandlerImpl# handleError method? > > We can just neglect the disconnection of watching process. and try to rewatch > once new requestResource called. And we can leverage on the akka heartbeat > timeout to discover the TM failure, just like YARN mode do. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail
[ https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17804553#comment-17804553 ] xiaogang zhou commented on FLINK-33728: --- [~mapohl] I think your concern is really very important. I think my statement is not good enough. After your reminder, I'd like to change it to : We can just neglect the disconnection of watching process {color:#FF}if there is no pending request{color}. and try to rewatch once new requestResource called. And we can choose to fail all CompletableFuture And the [requestWorkerIfRequired|https://github.com/apache/flink/blob/2b9b9859253698c3c90ca420f10975e27e6c52d4/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java#L332] will request the resource again, this will trigger the rewatch. WDYT [~mapohl] [~xtsong] > do not rewatch when KubernetesResourceManagerDriver watch fail > -- > > Key: FLINK-33728 > URL: https://issues.apache.org/jira/browse/FLINK-33728 > Project: Flink > Issue Type: New Feature > Components: Deployment / Kubernetes >Reporter: xiaogang zhou >Priority: Major > Labels: pull-request-available > > I met massive production problem when kubernetes ETCD slow responding happen. > After Kube recoverd after 1 hour, Thousands of Flink jobs using > kubernetesResourceManagerDriver rewatched when recieving > ResourceVersionTooOld, which caused great pressure on API Server and made > API server failed again... > > I am not sure is it necessary to > getResourceEventHandler().onError(throwable) > in PodCallbackHandlerImpl# handleError method? > > We can just neglect the disconnection of watching process. and try to rewatch > once new requestResource called. And we can leverage on the akka heartbeat > timeout to discover the TM failure, just like YARN mode do. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail
[ https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17804214#comment-17804214 ] xiaogang zhou commented on FLINK-33728: --- Hi Matthias, wish you had recovered and enjoyed a wonderful Holiday :). Can we have a discussion on my proposal [~mapohl] > do not rewatch when KubernetesResourceManagerDriver watch fail > -- > > Key: FLINK-33728 > URL: https://issues.apache.org/jira/browse/FLINK-33728 > Project: Flink > Issue Type: New Feature > Components: Deployment / Kubernetes >Reporter: xiaogang zhou >Priority: Major > Labels: pull-request-available > > I met massive production problem when kubernetes ETCD slow responding happen. > After Kube recoverd after 1 hour, Thousands of Flink jobs using > kubernetesResourceManagerDriver rewatched when recieving > ResourceVersionTooOld, which caused great pressure on API Server and made > API server failed again... > > I am not sure is it necessary to > getResourceEventHandler().onError(throwable) > in PodCallbackHandlerImpl# handleError method? > > We can just neglect the disconnection of watching process. and try to rewatch > once new requestResource called. And we can leverage on the akka heartbeat > timeout to discover the TM failure, just like YARN mode do. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail
[ https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798797#comment-17798797 ] xiaogang zhou commented on FLINK-33728: --- [~mapohl] Hi Matthias , would you please let me know what additional test is needed to prove my proposal can move forward. > do not rewatch when KubernetesResourceManagerDriver watch fail > -- > > Key: FLINK-33728 > URL: https://issues.apache.org/jira/browse/FLINK-33728 > Project: Flink > Issue Type: New Feature > Components: Deployment / Kubernetes >Reporter: xiaogang zhou >Priority: Major > Labels: pull-request-available > > I met massive production problem when kubernetes ETCD slow responding happen. > After Kube recoverd after 1 hour, Thousands of Flink jobs using > kubernetesResourceManagerDriver rewatched when recieving > ResourceVersionTooOld, which caused great pressure on API Server and made > API server failed again... > > I am not sure is it necessary to > getResourceEventHandler().onError(throwable) > in PodCallbackHandlerImpl# handleError method? > > We can just neglect the disconnection of watching process. and try to rewatch > once new requestResource called. And we can leverage on the akka heartbeat > timeout to discover the TM failure, just like YARN mode do. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail
[ https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17795571#comment-17795571 ] xiaogang zhou commented on FLINK-33728: --- Hi [~mapohl] , thanks for the comment above. sorry for my poor writing english :P, but I think your re-clarification is exactly what I am proposing. I'd like to introduce a lazy re-initialization of watch mechanism which will tolerate a disconnection of the watch until a new POD is requested. And I think your concern is how we detect a TM loss without a active watcher. I have test my change in a real K8S environment. With a disconnected watcher, I killed a TM pod. after no more than 50s, the task restarted with a exception {code:java} // code placeholder java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id flink-6168d34cf9d3a5d31ad8bb02bce6a370-taskmanager-1-8 timed out. at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1306) at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitC {code} moreover, I think YARN also do not have a watcher mechanism, so FLINK scheduled in yarn also relays on a heartbeat timeout mechanism? And an active rewatching strategy can really cause great pressure on API server, especially in the early versions without the resource version zero set in the watch-list request. > do not rewatch when KubernetesResourceManagerDriver watch fail > -- > > Key: FLINK-33728 > URL: https://issues.apache.org/jira/browse/FLINK-33728 > Project: Flink > Issue Type: New Feature > Components: Deployment / Kubernetes >Reporter: xiaogang zhou >Priority: Major > Labels: pull-request-available > > I met massive production problem when kubernetes ETCD slow responding happen. > After Kube recoverd after 1 hour, Thousands of Flink jobs using > kubernetesResourceManagerDriver rewatched when recieving > ResourceVersionTooOld, which caused great pressure on API Server and made > API server failed again... > > I am not sure is it necessary to > getResourceEventHandler().onError(throwable) > in PodCallbackHandlerImpl# handleError method? > > We can just neglect the disconnection of watching process. and try to rewatch > once new requestResource called. And we can leverage on the akka heartbeat > timeout to discover the TM failure, just like YARN mode do. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33741) Exposed Rocksdb statistics in Flink metrics and introduce 2 Rocksdb statistic related configuration
[ https://issues.apache.org/jira/browse/FLINK-33741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-33741: -- Summary: Exposed Rocksdb statistics in Flink metrics and introduce 2 Rocksdb statistic related configuration (was: Introduce stat dump period and statsLevel configuration) > Exposed Rocksdb statistics in Flink metrics and introduce 2 Rocksdb statistic > related configuration > --- > > Key: FLINK-33741 > URL: https://issues.apache.org/jira/browse/FLINK-33741 > Project: Flink > Issue Type: New Feature >Reporter: xiaogang zhou >Priority: Major > > I'd like to introduce 2 rocksdb statistic related configuration. > Then we can customize stats > {code:java} > // code placeholder > Statistics s = new Statistics(); > s.setStatsLevel(EXCEPT_TIME_FOR_MUTEX); > currentOptions.setStatsDumpPeriodSec(internalGetOption(RocksDBConfigurableOptions.STATISTIC_DUMP_PERIOD)) > .setStatistics(s); {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33741) Exposed Rocksdb statistics in Flink metrics and introduce 2 Rocksdb statistic related configuration
[ https://issues.apache.org/jira/browse/FLINK-33741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-33741: -- Description: I think we can also parse the multi-line string of the rocksdb statistics. {code:java} // code placeholder /** * DB implements can export properties about their state * via this method on a per column family level. * * If {@code property} is a valid property understood by this DB * implementation, fills {@code value} with its current value and * returns true. Otherwise returns false. * * Valid property names include: * * "rocksdb.num-files-at-level" - return the number of files at * level , where is an ASCII representation of a level * number (e.g. "0"). * "rocksdb.stats" - returns a multi-line string that describes statistics * about the internal operation of the DB. * "rocksdb.sstables" - returns a multi-line string that describes all *of the sstables that make up the db contents. * * * @param columnFamilyHandle {@link org.rocksdb.ColumnFamilyHandle} * instance, or null for the default column family. * @param property to be fetched. See above for examples * @return property value * * @throws RocksDBException thrown if error happens in underlying *native library. */ public String getProperty( /* @Nullable */ final ColumnFamilyHandle columnFamilyHandle, final String property) throws RocksDBException { {code} Then we can directly export these rt latency number in metrics. I'd like to introduce 2 rocksdb statistic related configuration. Then we can customize stats {code:java} // code placeholder Statistics s = new Statistics(); s.setStatsLevel(EXCEPT_TIME_FOR_MUTEX); currentOptions.setStatsDumpPeriodSec(internalGetOption(RocksDBConfigurableOptions.STATISTIC_DUMP_PERIOD)) .setStatistics(s); {code} was: I'd like to introduce 2 rocksdb statistic related configuration. Then we can customize stats {code:java} // code placeholder Statistics s = new Statistics(); s.setStatsLevel(EXCEPT_TIME_FOR_MUTEX); currentOptions.setStatsDumpPeriodSec(internalGetOption(RocksDBConfigurableOptions.STATISTIC_DUMP_PERIOD)) .setStatistics(s); {code} > Exposed Rocksdb statistics in Flink metrics and introduce 2 Rocksdb statistic > related configuration > --- > > Key: FLINK-33741 > URL: https://issues.apache.org/jira/browse/FLINK-33741 > Project: Flink > Issue Type: New Feature >Reporter: xiaogang zhou >Priority: Major > > I think we can also parse the multi-line string of the rocksdb statistics. > {code:java} > // code placeholder > /** > * DB implements can export properties about their state > * via this method on a per column family level. > * > * If {@code property} is a valid property understood by this DB > * implementation, fills {@code value} with its current value and > * returns true. Otherwise returns false. > * > * Valid property names include: > * > * "rocksdb.num-files-at-level " - return the number of files at > * level , where is an ASCII representation of a level > * number (e.g. "0"). > * "rocksdb.stats" - returns a multi-line string that describes statistics > * about the internal operation of the DB. > * "rocksdb.sstables" - returns a multi-line string that describes all > *of the sstables that make up the db contents. > * > * > * @param columnFamilyHandle {@link org.rocksdb.ColumnFamilyHandle} > * instance, or null for the default column family. > * @param property to be fetched. See above for examples > * @return property value > * > * @throws RocksDBException thrown if error happens in underlying > *native library. > */ > public String getProperty( > /* @Nullable */ final ColumnFamilyHandle columnFamilyHandle, > final String property) throws RocksDBException { {code} > > Then we can directly export these rt latency number in metrics. > > I'd like to introduce 2 rocksdb statistic related configuration. > Then we can customize stats > {code:java} > // code placeholder > Statistics s = new Statistics(); > s.setStatsLevel(EXCEPT_TIME_FOR_MUTEX); > currentOptions.setStatsDumpPeriodSec(internalGetOption(RocksDBConfigurableOptions.STATISTIC_DUMP_PERIOD)) > .setStatistics(s); {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33741) Introduce stat dump period and statsLevel configuration
[ https://issues.apache.org/jira/browse/FLINK-33741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-33741: -- Summary: Introduce stat dump period and statsLevel configuration (was: introduce stat dump period and statsLevel configuration) > Introduce stat dump period and statsLevel configuration > --- > > Key: FLINK-33741 > URL: https://issues.apache.org/jira/browse/FLINK-33741 > Project: Flink > Issue Type: New Feature >Reporter: xiaogang zhou >Priority: Major > > I'd like to introduce 2 rocksdb statistic related configuration. > Then we can customize stats > {code:java} > // code placeholder > Statistics s = new Statistics(); > s.setStatsLevel(EXCEPT_TIME_FOR_MUTEX); > currentOptions.setStatsDumpPeriodSec(internalGetOption(RocksDBConfigurableOptions.STATISTIC_DUMP_PERIOD)) > .setStatistics(s); {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33741) introduce stat dump period and statsLevel configuration
[ https://issues.apache.org/jira/browse/FLINK-33741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17794941#comment-17794941 ] xiaogang zhou commented on FLINK-33741: --- And I think we can also parse the multi-line string of the rocksdb statistics. Then we can directly export these rt latency number in metrics. > introduce stat dump period and statsLevel configuration > --- > > Key: FLINK-33741 > URL: https://issues.apache.org/jira/browse/FLINK-33741 > Project: Flink > Issue Type: New Feature >Reporter: xiaogang zhou >Priority: Major > > I'd like to introduce 2 rocksdb statistic related configuration. > Then we can customize stats > {code:java} > // code placeholder > Statistics s = new Statistics(); > s.setStatsLevel(EXCEPT_TIME_FOR_MUTEX); > currentOptions.setStatsDumpPeriodSec(internalGetOption(RocksDBConfigurableOptions.STATISTIC_DUMP_PERIOD)) > .setStatistics(s); {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail
[ https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17794900#comment-17794900 ] xiaogang zhou edited comment on FLINK-33728 at 12/9/23 9:52 AM: [~gyfora] my proposal is keep the jobmanager running after watch fail, and do not rewatch before next request resource called. A healthy watch listener can get notification from kubernetes of two kind: add pod and delete pod. 1. add pod is necessary when request resource, when we are not requesting resource, this notification is allowed to be lost. 2. delete pod can allow us detect pod failure more quickly, but we can also discover it by detecting the lost of akka heartbeat timeout. according to the statement above, we can tolerate the lost of watch connection when we are not requesting resource was (Author: zhoujira86): [~gyfora] my proposal is keep the jobmanager running even rewatch fail. A healthy watch listener can get notification from kubernetes of two kind: add pod and delete pod. 1. add pod is necessary when request resource, when we are not requesting resource, this notification is allowed to be lost. 2. delete pod can allow us detect pod failure more quickly, but we can also discover it by detecting the lost of akka heartbeat timeout. according to the statement above, we can tolerate the lost of watch connection when we are not requesting resource > do not rewatch when KubernetesResourceManagerDriver watch fail > -- > > Key: FLINK-33728 > URL: https://issues.apache.org/jira/browse/FLINK-33728 > Project: Flink > Issue Type: New Feature > Components: Deployment / Kubernetes >Reporter: xiaogang zhou >Priority: Major > Labels: pull-request-available > > I met massive production problem when kubernetes ETCD slow responding happen. > After Kube recoverd after 1 hour, Thousands of Flink jobs using > kubernetesResourceManagerDriver rewatched when recieving > ResourceVersionTooOld, which caused great pressure on API Server and made > API server failed again... > > I am not sure is it necessary to > getResourceEventHandler().onError(throwable) > in PodCallbackHandlerImpl# handleError method? > > We can just neglect the disconnection of watching process. and try to rewatch > once new requestResource called. And we can leverage on the akka heartbeat > timeout to discover the TM failure, just like YARN mode do. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail
[ https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17794900#comment-17794900 ] xiaogang zhou commented on FLINK-33728: --- [~gyfora] my proposal is keep the jobmanager running even rewatch fail. A healthy watch listener can get notification from kubernetes of two kind: add pod and delete pod. 1. add pod is necessary when request resource, when we are not requesting resource, this notification is allowed to be lost. 2. delete pod can allow us detect pod failure more quickly, but we can also discover it by detecting the lost of akka heartbeat timeout. according to the statement above, we can tolerate the lost of watch connection when we are not requesting resource > do not rewatch when KubernetesResourceManagerDriver watch fail > -- > > Key: FLINK-33728 > URL: https://issues.apache.org/jira/browse/FLINK-33728 > Project: Flink > Issue Type: New Feature > Components: Deployment / Kubernetes >Reporter: xiaogang zhou >Priority: Major > > I met massive production problem when kubernetes ETCD slow responding happen. > After Kube recoverd after 1 hour, Thousands of Flink jobs using > kubernetesResourceManagerDriver rewatched when recieving > ResourceVersionTooOld, which caused great pressure on API Server and made > API server failed again... > > I am not sure is it necessary to > getResourceEventHandler().onError(throwable) > in PodCallbackHandlerImpl# handleError method? > > We can just neglect the disconnection of watching process. and try to rewatch > once new requestResource called. And we can leverage on the akka heartbeat > timeout to discover the TM failure, just like YARN mode do. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33741) introduce stat dump period and statsLevel configuration
xiaogang zhou created FLINK-33741: - Summary: introduce stat dump period and statsLevel configuration Key: FLINK-33741 URL: https://issues.apache.org/jira/browse/FLINK-33741 Project: Flink Issue Type: New Feature Reporter: xiaogang zhou I'd like to introduce 2 rocksdb statistic related configuration. Then we can customize stats {code:java} // code placeholder Statistics s = new Statistics(); s.setStatsLevel(EXCEPT_TIME_FOR_MUTEX); currentOptions.setStatsDumpPeriodSec(internalGetOption(RocksDBConfigurableOptions.STATISTIC_DUMP_PERIOD)) .setStatistics(s); {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail
[ https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793092#comment-17793092 ] xiaogang zhou commented on FLINK-33728: --- [~wangyang0918] [~mapohl] [~gyfora] Would you please let me know you thinking? > do not rewatch when KubernetesResourceManagerDriver watch fail > -- > > Key: FLINK-33728 > URL: https://issues.apache.org/jira/browse/FLINK-33728 > Project: Flink > Issue Type: New Feature > Components: Deployment / Kubernetes >Reporter: xiaogang zhou >Priority: Major > > I met massive production problem when kubernetes ETCD slow responding happen. > After Kube recoverd after 1 hour, Thousands of Flink jobs using > kubernetesResourceManagerDriver rewatched when recieving > ResourceVersionTooOld, which caused great pressure on API Server and made > API server failed again... > > I am not sure is it necessary to > getResourceEventHandler().onError(throwable) > in PodCallbackHandlerImpl# handleError method? > > We can just neglect the disconnection of watching process. and try to rewatch > once new requestResource called. And we can leverage on the akka heartbeat > timeout to discover the TM failure, just like YARN mode do. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail
[ https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-33728: -- Description: I met massive production problem when kubernetes ETCD slow responding happen. After Kube recoverd after 1 hour, Thousands of Flink jobs using kubernetesResourceManagerDriver rewatched when recieving ResourceVersionTooOld, which caused great pressure on API Server and made API server failed again... I am not sure is it necessary to getResourceEventHandler().onError(throwable) in PodCallbackHandlerImpl# handleError method? We can just neglect the disconnection of watching process. and try to rewatch once new requestResource called. And we can leverage on the akka heartbeat timeout to discover the TM failure, just like YARN mode do. was: is it necessary to getResourceEventHandler().onError(throwable) in PodCallbackHandlerImpl# handleError method. We can just neglect the disconnection of watching process. and try to rewatch once new requestResource called > do not rewatch when KubernetesResourceManagerDriver watch fail > -- > > Key: FLINK-33728 > URL: https://issues.apache.org/jira/browse/FLINK-33728 > Project: Flink > Issue Type: New Feature > Components: Deployment / Kubernetes >Reporter: xiaogang zhou >Priority: Major > > I met massive production problem when kubernetes ETCD slow responding happen. > After Kube recoverd after 1 hour, Thousands of Flink jobs using > kubernetesResourceManagerDriver rewatched when recieving > ResourceVersionTooOld, which caused great pressure on API Server and made > API server failed again... > > I am not sure is it necessary to > getResourceEventHandler().onError(throwable) > in PodCallbackHandlerImpl# handleError method? > > We can just neglect the disconnection of watching process. and try to rewatch > once new requestResource called. And we can leverage on the akka heartbeat > timeout to discover the TM failure, just like YARN mode do. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail
[ https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-33728: -- Description: is it necessary to getResourceEventHandler().onError(throwable) in PodCallbackHandlerImpl# handleError method. We can just neglect the disconnection of watching process. and try to rewatch once new requestResource called > do not rewatch when KubernetesResourceManagerDriver watch fail > -- > > Key: FLINK-33728 > URL: https://issues.apache.org/jira/browse/FLINK-33728 > Project: Flink > Issue Type: New Feature > Components: Deployment / Kubernetes >Reporter: xiaogang zhou >Priority: Major > > is it necessary to > getResourceEventHandler().onError(throwable) > in PodCallbackHandlerImpl# handleError method. > > We can just neglect the disconnection of watching process. and try to rewatch > once new requestResource called -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail
xiaogang zhou created FLINK-33728: - Summary: do not rewatch when KubernetesResourceManagerDriver watch fail Key: FLINK-33728 URL: https://issues.apache.org/jira/browse/FLINK-33728 Project: Flink Issue Type: New Feature Components: Deployment / Kubernetes Reporter: xiaogang zhou -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33249) comment should be parsed by StringLiteral() instead of SqlCharStringLiteral to avoid parsing failure
[ https://issues.apache.org/jira/browse/FLINK-33249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1035#comment-1035 ] xiaogang zhou commented on FLINK-33249: --- Hi [~martijnvisser] https://issues.apache.org/jira/browse/CALCITE-6001 will improve CALCITE to omit the charset from the generated literal when it is the default charset of the DIALECT. maybe wait for the CALCITE future version and set the FLINK DIALECT to use UTF-8 > comment should be parsed by StringLiteral() instead of SqlCharStringLiteral > to avoid parsing failure > > > Key: FLINK-33249 > URL: https://issues.apache.org/jira/browse/FLINK-33249 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.17.1 >Reporter: xiaogang zhou >Priority: Major > Labels: pull-request-available > > this problem is also recorded in calcite > > https://issues.apache.org/jira/browse/CALCITE-6046 > > Hi, I found this problem when I used below code to split SQL statements. the > process is SQL string -> SqlNode -> SQL String > {code:java} > // code placeholder > SqlParser.Config parserConfig = getCurrentSqlParserConfig(sqlDialect); > SqlParser sqlParser = SqlParser.create(sqlContent, parserConfig); > SqlNodeList sqlNodeList = sqlParser.parseStmtList(); > sqlParser.parse(sqlNodeList.get(0));{code} > the Dialect/ SqlConformance is a costumed one: > [https://github.com/apache/flink/blob/master/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java] > > > then I found below SQL > {code:java} > // code placeholder > CREATE TABLE source ( > a BIGINT > ) comment '测试test' > WITH ( > 'connector' = 'test' > ); {code} > transformed to > {code:java} > // code placeholder > CREATE TABLE `source` ( > `a` BIGINT > ) > COMMENT u&'\5218\51eftest' WITH ( > 'connector' = 'test' > ) {code} > > and the SQL parser template is like > {code:java} > // code placeholder > SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) : > { > final SqlParserPos startPos = s.pos(); > boolean ifNotExists = false; > SqlIdentifier tableName; > List constraints = new > ArrayList(); > SqlWatermark watermark = null; > SqlNodeList columnList = SqlNodeList.EMPTY; >SqlCharStringLiteral comment = null; >SqlTableLike tableLike = null; > SqlNode asQuery = null; > SqlNodeList propertyList = SqlNodeList.EMPTY; > SqlNodeList partitionColumns = SqlNodeList.EMPTY; > SqlParserPos pos = startPos; > } > { > > ifNotExists = IfNotExistsOpt() > tableName = CompoundIdentifier() > [ > { pos = getPos(); TableCreationContext ctx = new > TableCreationContext();} > TableColumn(ctx) > ( > TableColumn(ctx) > )* > { > pos = pos.plus(getPos()); > columnList = new SqlNodeList(ctx.columnList, pos); > constraints = ctx.constraints; > watermark = ctx.watermark; > } > > ] > [ { > String p = SqlParserUtil.parseString(token.image); > comment = SqlLiteral.createCharString(p, getPos()); > }] > [ > > partitionColumns = ParenthesizedSimpleIdentifierList() > ] > [ > > propertyList = TableProperties() > ] > [ > > tableLike = SqlTableLike(getPos()) > { > return new SqlCreateTableLike(startPos.plus(getPos()), > tableName, > columnList, > constraints, > propertyList, > partitionColumns, > watermark, > comment, > tableLike, > isTemporary, > ifNotExists); > } > | > > asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) > { > return new SqlCreateTableAs(startPos.plus(getPos()), > tableName, > columnList, > constraints, > propertyList, > partitionColumns, > watermark, > comment, > asQuery, > isTemporary, > ifNotExists); > } > ] > { > return new SqlCreateTable(startPos.plus(getPos()), > tableName, > columnList, > constraints, > propertyList, > partitionColumns, > watermark, > comment, > isTemporary, > ifNotExists); > } > } {code} > will give a exception : > Caused by: org.apache.calcite.sql.parser.SqlParseException: Encounter
[jira] [Commented] (FLINK-33249) comment should be parsed by StringLiteral() instead of SqlCharStringLiteral to avoid parsing failure
[ https://issues.apache.org/jira/browse/FLINK-33249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17774468#comment-17774468 ] xiaogang zhou commented on FLINK-33249: --- [~martijnvisser] Sure thing, Let me provide a test to demonstrate the issue. > comment should be parsed by StringLiteral() instead of SqlCharStringLiteral > to avoid parsing failure > > > Key: FLINK-33249 > URL: https://issues.apache.org/jira/browse/FLINK-33249 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.17.1 >Reporter: xiaogang zhou >Priority: Major > Labels: pull-request-available > > this problem is also recorded in calcite > > https://issues.apache.org/jira/browse/CALCITE-6046 > > Hi, I found this problem when I used below code to split SQL statements. the > process is SQL string -> SqlNode -> SQL String > {code:java} > // code placeholder > SqlParser.Config parserConfig = getCurrentSqlParserConfig(sqlDialect); > SqlParser sqlParser = SqlParser.create(sqlContent, parserConfig); > SqlNodeList sqlNodeList = sqlParser.parseStmtList(); > sqlParser.parse(sqlNodeList.get(0));{code} > the Dialect/ SqlConformance is a costumed one: > [https://github.com/apache/flink/blob/master/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java] > > > then I found below SQL > {code:java} > // code placeholder > CREATE TABLE source ( > a BIGINT > ) comment '测试test' > WITH ( > 'connector' = 'test' > ); {code} > transformed to > {code:java} > // code placeholder > CREATE TABLE `source` ( > `a` BIGINT > ) > COMMENT u&'\5218\51eftest' WITH ( > 'connector' = 'test' > ) {code} > > and the SQL parser template is like > {code:java} > // code placeholder > SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) : > { > final SqlParserPos startPos = s.pos(); > boolean ifNotExists = false; > SqlIdentifier tableName; > List constraints = new > ArrayList(); > SqlWatermark watermark = null; > SqlNodeList columnList = SqlNodeList.EMPTY; >SqlCharStringLiteral comment = null; >SqlTableLike tableLike = null; > SqlNode asQuery = null; > SqlNodeList propertyList = SqlNodeList.EMPTY; > SqlNodeList partitionColumns = SqlNodeList.EMPTY; > SqlParserPos pos = startPos; > } > { > > ifNotExists = IfNotExistsOpt() > tableName = CompoundIdentifier() > [ > { pos = getPos(); TableCreationContext ctx = new > TableCreationContext();} > TableColumn(ctx) > ( > TableColumn(ctx) > )* > { > pos = pos.plus(getPos()); > columnList = new SqlNodeList(ctx.columnList, pos); > constraints = ctx.constraints; > watermark = ctx.watermark; > } > > ] > [ { > String p = SqlParserUtil.parseString(token.image); > comment = SqlLiteral.createCharString(p, getPos()); > }] > [ > > partitionColumns = ParenthesizedSimpleIdentifierList() > ] > [ > > propertyList = TableProperties() > ] > [ > > tableLike = SqlTableLike(getPos()) > { > return new SqlCreateTableLike(startPos.plus(getPos()), > tableName, > columnList, > constraints, > propertyList, > partitionColumns, > watermark, > comment, > tableLike, > isTemporary, > ifNotExists); > } > | > > asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) > { > return new SqlCreateTableAs(startPos.plus(getPos()), > tableName, > columnList, > constraints, > propertyList, > partitionColumns, > watermark, > comment, > asQuery, > isTemporary, > ifNotExists); > } > ] > { > return new SqlCreateTable(startPos.plus(getPos()), > tableName, > columnList, > constraints, > propertyList, > partitionColumns, > watermark, > comment, > isTemporary, > ifNotExists); > } > } {code} > will give a exception : > Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered > "u&\'\\5218\\51eftest\'" at line 4, column 9. > Was expecting: > ... > > so I think all the SqlCharStringLiteral should be replaced by StringLiteral() -- This message was sent by
[jira] [Commented] (FLINK-33249) comment should be parsed by StringLiteral() instead of SqlCharStringLiteral to avoid parsing failure
[ https://issues.apache.org/jira/browse/FLINK-33249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17774426#comment-17774426 ] xiaogang zhou commented on FLINK-33249: --- [~martijnvisser] Hi Martin, I am not sure whether this is something to be fixed in calcite. As SqlCreateTable template is in flink parser. I have attached a url, would you please have a glance at it? > comment should be parsed by StringLiteral() instead of SqlCharStringLiteral > to avoid parsing failure > > > Key: FLINK-33249 > URL: https://issues.apache.org/jira/browse/FLINK-33249 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.17.1 >Reporter: xiaogang zhou >Priority: Major > Labels: pull-request-available > > this problem is also recorded in calcite > > https://issues.apache.org/jira/browse/CALCITE-6046 > > Hi, I found this problem when I used below code to split SQL statements. the > process is SQL string -> SqlNode -> SQL String > {code:java} > // code placeholder > SqlParser.Config parserConfig = getCurrentSqlParserConfig(sqlDialect); > SqlParser sqlParser = SqlParser.create(sqlContent, parserConfig); > SqlNodeList sqlNodeList = sqlParser.parseStmtList(); > sqlParser.parse(sqlNodeList.get(0));{code} > the Dialect/ SqlConformance is a costumed one: > [https://github.com/apache/flink/blob/master/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java] > > > then I found below SQL > {code:java} > // code placeholder > CREATE TABLE source ( > a BIGINT > ) comment '测试test' > WITH ( > 'connector' = 'test' > ); {code} > transformed to > {code:java} > // code placeholder > CREATE TABLE `source` ( > `a` BIGINT > ) > COMMENT u&'\5218\51eftest' WITH ( > 'connector' = 'test' > ) {code} > > and the SQL parser template is like > {code:java} > // code placeholder > SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) : > { > final SqlParserPos startPos = s.pos(); > boolean ifNotExists = false; > SqlIdentifier tableName; > List constraints = new > ArrayList(); > SqlWatermark watermark = null; > SqlNodeList columnList = SqlNodeList.EMPTY; >SqlCharStringLiteral comment = null; >SqlTableLike tableLike = null; > SqlNode asQuery = null; > SqlNodeList propertyList = SqlNodeList.EMPTY; > SqlNodeList partitionColumns = SqlNodeList.EMPTY; > SqlParserPos pos = startPos; > } > { > > ifNotExists = IfNotExistsOpt() > tableName = CompoundIdentifier() > [ > { pos = getPos(); TableCreationContext ctx = new > TableCreationContext();} > TableColumn(ctx) > ( > TableColumn(ctx) > )* > { > pos = pos.plus(getPos()); > columnList = new SqlNodeList(ctx.columnList, pos); > constraints = ctx.constraints; > watermark = ctx.watermark; > } > > ] > [ { > String p = SqlParserUtil.parseString(token.image); > comment = SqlLiteral.createCharString(p, getPos()); > }] > [ > > partitionColumns = ParenthesizedSimpleIdentifierList() > ] > [ > > propertyList = TableProperties() > ] > [ > > tableLike = SqlTableLike(getPos()) > { > return new SqlCreateTableLike(startPos.plus(getPos()), > tableName, > columnList, > constraints, > propertyList, > partitionColumns, > watermark, > comment, > tableLike, > isTemporary, > ifNotExists); > } > | > > asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) > { > return new SqlCreateTableAs(startPos.plus(getPos()), > tableName, > columnList, > constraints, > propertyList, > partitionColumns, > watermark, > comment, > asQuery, > isTemporary, > ifNotExists); > } > ] > { > return new SqlCreateTable(startPos.plus(getPos()), > tableName, > columnList, > constraints, > propertyList, > partitionColumns, > watermark, > comment, > isTemporary, > ifNotExists); > } > } {code} > will give a exception : > Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered > "u&\'\\5218\\51eftest\'" at line 4, column 9. > Was expectin
[jira] [Commented] (FLINK-33249) comment should be parsed by StringLiteral() instead of SqlCharStringLiteral to avoid parsing failure
[ https://issues.apache.org/jira/browse/FLINK-33249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17774339#comment-17774339 ] xiaogang zhou commented on FLINK-33249: --- I'd like to take this ticket > comment should be parsed by StringLiteral() instead of SqlCharStringLiteral > to avoid parsing failure > > > Key: FLINK-33249 > URL: https://issues.apache.org/jira/browse/FLINK-33249 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.17.1 >Reporter: xiaogang zhou >Priority: Major > > this problem is also recorded in calcite > > https://issues.apache.org/jira/browse/CALCITE-6046 > > Hi, I found this problem when I used below code to split SQL statements. the > process is SQL string -> SqlNode -> SQL String > {code:java} > // code placeholder > SqlParser.Config parserConfig = getCurrentSqlParserConfig(sqlDialect); > SqlParser sqlParser = SqlParser.create(sqlContent, parserConfig); > SqlNodeList sqlNodeList = sqlParser.parseStmtList(); > sqlParser.parse(sqlNodeList.get(0));{code} > the Dialect/ SqlConformance is a costumed one: > [https://github.com/apache/flink/blob/master/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java] > > > then I found below SQL > {code:java} > // code placeholder > CREATE TABLE source ( > a BIGINT > ) comment '测试test' > WITH ( > 'connector' = 'test' > ); {code} > transformed to > {code:java} > // code placeholder > CREATE TABLE `source` ( > `a` BIGINT > ) > COMMENT u&'\5218\51eftest' WITH ( > 'connector' = 'test' > ) {code} > > and the SQL parser template is like > {code:java} > // code placeholder > SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) : > { > final SqlParserPos startPos = s.pos(); > boolean ifNotExists = false; > SqlIdentifier tableName; > List constraints = new > ArrayList(); > SqlWatermark watermark = null; > SqlNodeList columnList = SqlNodeList.EMPTY; >SqlCharStringLiteral comment = null; >SqlTableLike tableLike = null; > SqlNode asQuery = null; > SqlNodeList propertyList = SqlNodeList.EMPTY; > SqlNodeList partitionColumns = SqlNodeList.EMPTY; > SqlParserPos pos = startPos; > } > { > > ifNotExists = IfNotExistsOpt() > tableName = CompoundIdentifier() > [ > { pos = getPos(); TableCreationContext ctx = new > TableCreationContext();} > TableColumn(ctx) > ( > TableColumn(ctx) > )* > { > pos = pos.plus(getPos()); > columnList = new SqlNodeList(ctx.columnList, pos); > constraints = ctx.constraints; > watermark = ctx.watermark; > } > > ] > [ { > String p = SqlParserUtil.parseString(token.image); > comment = SqlLiteral.createCharString(p, getPos()); > }] > [ > > partitionColumns = ParenthesizedSimpleIdentifierList() > ] > [ > > propertyList = TableProperties() > ] > [ > > tableLike = SqlTableLike(getPos()) > { > return new SqlCreateTableLike(startPos.plus(getPos()), > tableName, > columnList, > constraints, > propertyList, > partitionColumns, > watermark, > comment, > tableLike, > isTemporary, > ifNotExists); > } > | > > asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) > { > return new SqlCreateTableAs(startPos.plus(getPos()), > tableName, > columnList, > constraints, > propertyList, > partitionColumns, > watermark, > comment, > asQuery, > isTemporary, > ifNotExists); > } > ] > { > return new SqlCreateTable(startPos.plus(getPos()), > tableName, > columnList, > constraints, > propertyList, > partitionColumns, > watermark, > comment, > isTemporary, > ifNotExists); > } > } {code} > will give a exception : > Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered > "u&\'\\5218\\51eftest\'" at line 4, column 9. > Was expecting: > ... > > so I think all the SqlCharStringLiteral should be replaced by StringLiteral() -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33249) comment should be parsed by StringLiteral() instead of SqlCharStringLiteral to avoid parsing failure
[ https://issues.apache.org/jira/browse/FLINK-33249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-33249: -- Description: this problem is also recorded in calcite https://issues.apache.org/jira/browse/CALCITE-6046 Hi, I found this problem when I used below code to split SQL statements. the process is SQL string -> SqlNode -> SQL String {code:java} // code placeholder SqlParser.Config parserConfig = getCurrentSqlParserConfig(sqlDialect); SqlParser sqlParser = SqlParser.create(sqlContent, parserConfig); SqlNodeList sqlNodeList = sqlParser.parseStmtList(); sqlParser.parse(sqlNodeList.get(0));{code} the Dialect/ SqlConformance is a costumed one: [https://github.com/apache/flink/blob/master/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java] then I found below SQL {code:java} // code placeholder CREATE TABLE source ( a BIGINT ) comment '测试test' WITH ( 'connector' = 'test' ); {code} transformed to {code:java} // code placeholder CREATE TABLE `source` ( `a` BIGINT ) COMMENT u&'\5218\51eftest' WITH ( 'connector' = 'test' ) {code} and the SQL parser template is like {code:java} // code placeholder SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) : { final SqlParserPos startPos = s.pos(); boolean ifNotExists = false; SqlIdentifier tableName; List constraints = new ArrayList(); SqlWatermark watermark = null; SqlNodeList columnList = SqlNodeList.EMPTY; SqlCharStringLiteral comment = null; SqlTableLike tableLike = null; SqlNode asQuery = null; SqlNodeList propertyList = SqlNodeList.EMPTY; SqlNodeList partitionColumns = SqlNodeList.EMPTY; SqlParserPos pos = startPos; } { ifNotExists = IfNotExistsOpt() tableName = CompoundIdentifier() [ { pos = getPos(); TableCreationContext ctx = new TableCreationContext();} TableColumn(ctx) ( TableColumn(ctx) )* { pos = pos.plus(getPos()); columnList = new SqlNodeList(ctx.columnList, pos); constraints = ctx.constraints; watermark = ctx.watermark; } ] [ { String p = SqlParserUtil.parseString(token.image); comment = SqlLiteral.createCharString(p, getPos()); }] [ partitionColumns = ParenthesizedSimpleIdentifierList() ] [ propertyList = TableProperties() ] [ tableLike = SqlTableLike(getPos()) { return new SqlCreateTableLike(startPos.plus(getPos()), tableName, columnList, constraints, propertyList, partitionColumns, watermark, comment, tableLike, isTemporary, ifNotExists); } | asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) { return new SqlCreateTableAs(startPos.plus(getPos()), tableName, columnList, constraints, propertyList, partitionColumns, watermark, comment, asQuery, isTemporary, ifNotExists); } ] { return new SqlCreateTable(startPos.plus(getPos()), tableName, columnList, constraints, propertyList, partitionColumns, watermark, comment, isTemporary, ifNotExists); } } {code} will give a exception : Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "u&\'\\5218\\51eftest\'" at line 4, column 9. Was expecting: ... so I think all the SqlCharStringLiteral should be replaced by > comment should be parsed by StringLiteral() instead of SqlCharStringLiteral > to avoid parsing failure > > > Key: FLINK-33249 > URL: https://issues.apache.org/jira/browse/FLINK-33249 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.17.1 >Reporter: xiaogang zhou >Priority: Major > > this problem is also recorded in calcite > > https://issues.apache.org/jira/browse/CALCITE-6046 > > Hi, I found this problem when I used below code to split SQL statements. the > process is SQL string -> SqlNode -> SQL String > {code:java} > // code placeholder > SqlParser.Config parserConfig = getCurrentSqlParserConfig(sqlDialect); > SqlParser sqlParser = SqlParser.create(sqlContent, parserConfig); > SqlNodeList sqlNodeList = sqlParser.parseStmtList(); > s
[jira] [Updated] (FLINK-33249) comment should be parsed by StringLiteral() instead of SqlCharStringLiteral to avoid parsing failure
[ https://issues.apache.org/jira/browse/FLINK-33249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-33249: -- Description: this problem is also recorded in calcite https://issues.apache.org/jira/browse/CALCITE-6046 Hi, I found this problem when I used below code to split SQL statements. the process is SQL string -> SqlNode -> SQL String {code:java} // code placeholder SqlParser.Config parserConfig = getCurrentSqlParserConfig(sqlDialect); SqlParser sqlParser = SqlParser.create(sqlContent, parserConfig); SqlNodeList sqlNodeList = sqlParser.parseStmtList(); sqlParser.parse(sqlNodeList.get(0));{code} the Dialect/ SqlConformance is a costumed one: [https://github.com/apache/flink/blob/master/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java] then I found below SQL {code:java} // code placeholder CREATE TABLE source ( a BIGINT ) comment '测试test' WITH ( 'connector' = 'test' ); {code} transformed to {code:java} // code placeholder CREATE TABLE `source` ( `a` BIGINT ) COMMENT u&'\5218\51eftest' WITH ( 'connector' = 'test' ) {code} and the SQL parser template is like {code:java} // code placeholder SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) : { final SqlParserPos startPos = s.pos(); boolean ifNotExists = false; SqlIdentifier tableName; List constraints = new ArrayList(); SqlWatermark watermark = null; SqlNodeList columnList = SqlNodeList.EMPTY; SqlCharStringLiteral comment = null; SqlTableLike tableLike = null; SqlNode asQuery = null; SqlNodeList propertyList = SqlNodeList.EMPTY; SqlNodeList partitionColumns = SqlNodeList.EMPTY; SqlParserPos pos = startPos; } { ifNotExists = IfNotExistsOpt() tableName = CompoundIdentifier() [ { pos = getPos(); TableCreationContext ctx = new TableCreationContext();} TableColumn(ctx) ( TableColumn(ctx) )* { pos = pos.plus(getPos()); columnList = new SqlNodeList(ctx.columnList, pos); constraints = ctx.constraints; watermark = ctx.watermark; } ] [ { String p = SqlParserUtil.parseString(token.image); comment = SqlLiteral.createCharString(p, getPos()); }] [ partitionColumns = ParenthesizedSimpleIdentifierList() ] [ propertyList = TableProperties() ] [ tableLike = SqlTableLike(getPos()) { return new SqlCreateTableLike(startPos.plus(getPos()), tableName, columnList, constraints, propertyList, partitionColumns, watermark, comment, tableLike, isTemporary, ifNotExists); } | asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) { return new SqlCreateTableAs(startPos.plus(getPos()), tableName, columnList, constraints, propertyList, partitionColumns, watermark, comment, asQuery, isTemporary, ifNotExists); } ] { return new SqlCreateTable(startPos.plus(getPos()), tableName, columnList, constraints, propertyList, partitionColumns, watermark, comment, isTemporary, ifNotExists); } } {code} will give a exception : Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "u&\'\\5218\\51eftest\'" at line 4, column 9. Was expecting: ... so I think all the SqlCharStringLiteral should be replaced by StringLiteral() was: this problem is also recorded in calcite https://issues.apache.org/jira/browse/CALCITE-6046 Hi, I found this problem when I used below code to split SQL statements. the process is SQL string -> SqlNode -> SQL String {code:java} // code placeholder SqlParser.Config parserConfig = getCurrentSqlParserConfig(sqlDialect); SqlParser sqlParser = SqlParser.create(sqlContent, parserConfig); SqlNodeList sqlNodeList = sqlParser.parseStmtList(); sqlParser.parse(sqlNodeList.get(0));{code} the Dialect/ SqlConformance is a costumed one: [https://github.com/apache/flink/blob/master/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java] then I found below SQL {code:java} // code placeholder CREATE TABLE source ( a BIGINT ) comment '测试test' WITH ( 'connector' = 'test' ); {code} transformed to {code:java} // code placeholder CREATE TABLE `source` ( `a` BIGINT ) COMMENT u&'\5218\51eftest' WITH ( '
[jira] [Created] (FLINK-33249) comment should be parsed by StringLiteral() instead of SqlCharStringLiteral to avoid parsing failure
xiaogang zhou created FLINK-33249: - Summary: comment should be parsed by StringLiteral() instead of SqlCharStringLiteral to avoid parsing failure Key: FLINK-33249 URL: https://issues.apache.org/jira/browse/FLINK-33249 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.17.1 Reporter: xiaogang zhou -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33174) enabling tablesample bernoulli in flink
[ https://issues.apache.org/jira/browse/FLINK-33174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17773306#comment-17773306 ] xiaogang zhou commented on FLINK-33174: --- [~libenchao] [~lsy] [~martijnvisser] Thanks all for your comments, Let me prepare a FLIP first and wait for the calcite upgrading > enabling tablesample bernoulli in flink > --- > > Key: FLINK-33174 > URL: https://issues.apache.org/jira/browse/FLINK-33174 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.17.1 >Reporter: xiaogang zhou >Priority: Major > > I'd like to introduce a table sample function to enable fast sampling to > streamings. > this is enlighted by https://issues.apache.org/jira/browse/CALCITE-5971 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33174) enabling tablesample bernoulli in flink
[ https://issues.apache.org/jira/browse/FLINK-33174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17772820#comment-17772820 ] xiaogang zhou commented on FLINK-33174: --- [~libenchao] Hi Benchao, Would you please assign the issue to me? We can see whether to wait for the calcite bumping or find someway else > enabling tablesample bernoulli in flink > --- > > Key: FLINK-33174 > URL: https://issues.apache.org/jira/browse/FLINK-33174 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.17.1 >Reporter: xiaogang zhou >Priority: Major > > I'd like to introduce a table sample function to enable fast sampling to > streamings. > this is enlighted by https://issues.apache.org/jira/browse/CALCITE-5971 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33174) enabling tablesample bernoulli in flink
[ https://issues.apache.org/jira/browse/FLINK-33174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17770822#comment-17770822 ] xiaogang zhou commented on FLINK-33174: --- [~lsy] [~libenchao] Hi Bros, would you please help review the suggestion? > enabling tablesample bernoulli in flink > --- > > Key: FLINK-33174 > URL: https://issues.apache.org/jira/browse/FLINK-33174 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.17.1 >Reporter: xiaogang zhou >Priority: Major > > I'd like to introduce a table sample function to enable fast sampling to > streamings. > this is enlighted by https://issues.apache.org/jira/browse/CALCITE-5971 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33174) enabling tablesample bernoulli in flink
xiaogang zhou created FLINK-33174: - Summary: enabling tablesample bernoulli in flink Key: FLINK-33174 URL: https://issues.apache.org/jira/browse/FLINK-33174 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.17.1 Reporter: xiaogang zhou I'd like to introduce a table sample function to enable fast sampling to streamings. this is enlighted by https://issues.apache.org/jira/browse/CALCITE-5971 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33162) seperate the executor in DefaultDispatcherResourceManagerComponentFactory for MetricFetcher and webMonitorEndpoint
[ https://issues.apache.org/jira/browse/FLINK-33162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-33162: -- Description: when starting a job with large number of taskmanagers, the jobmanager of the job failed to respond to and rest request. when look into the jstack we found all the 4 threads are server metrics fetcher. {code:java} // code placeholder "Flink-DispatcherRestEndpoint-thread-4" #91 daemon prio=5 os_prio=0 tid=0x7f17e7823000 nid=0x246 waiting for monitor entry [0x7f178e9fe000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:81) - waiting to lock <0x0003d5f62638> (a org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore) at org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244) at org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown Source) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Locked ownable synchronizers:- <0x0003ce80d8f0> (a java.util.concurrent.ThreadPoolExecutor$Worker) "Flink-DispatcherRestEndpoint-thread-3" #88 daemon prio=5 os_prio=0 tid=0x7f17e88af000 nid=0x243 waiting for monitor entry [0x7f1790dfe000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:81) - waiting to lock <0x0003d5f62638> (a org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore) at org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244) at org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown Source) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Locked ownable synchronizers:- <0x0003ce80df88> (a java.util.concurrent.ThreadPoolExecutor$Worker) "Flink-DispatcherRestEndpoint-thread-2" #79 daemon prio=5 os_prio=0 tid=0x7f1793473800 nid=0x23a runnable [0x7f17922fd000] java.lang.Thread.State: RUNNABLE at org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.add(MetricStore.java:216) at org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:82) - locked <0x0003d5f62638> (a org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore) at org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244) at org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown Source) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Sche
[jira] [Updated] (FLINK-33162) seperate the executor in DefaultDispatcherResourceManagerComponentFactory for MetricFetcher and webMonitorEndpoint
[ https://issues.apache.org/jira/browse/FLINK-33162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-33162: -- Affects Version/s: 1.13.1 (was: 1.16.0) > seperate the executor in DefaultDispatcherResourceManagerComponentFactory for > MetricFetcher and webMonitorEndpoint > -- > > Key: FLINK-33162 > URL: https://issues.apache.org/jira/browse/FLINK-33162 > Project: Flink > Issue Type: Improvement > Components: Runtime / REST >Affects Versions: 1.13.1 >Reporter: xiaogang zhou >Priority: Major > Fix For: 1.19.0 > > > when starting a job with large number of taskmanagers, the jobmanager of the > job failed to respond to and rest request. when look into the jstack we found > all the 4 threads are server metrics fetcher. > {code:java} > // code placeholder > "Flink-DispatcherRestEndpoint-thread-4" #91 daemon prio=5 os_prio=0 > tid=0x7f17e7823000 nid=0x246 waiting for monitor entry > [0x7f178e9fe000] java.lang.Thread.State: BLOCKED (on object monitor) > at > org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:81) > - waiting to lock <0x0003d5f62638> (a > org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore) at > org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244) > at > org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown > Source) at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) >at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) >at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Locked ownable synchronizers: - <0x0003ce80d8f0> (a > java.util.concurrent.ThreadPoolExecutor$Worker) > "Flink-DispatcherRestEndpoint-thread-3" #88 daemon prio=5 os_prio=0 > tid=0x7f17e88af000 nid=0x243 waiting for monitor entry > [0x7f1790dfe000] java.lang.Thread.State: BLOCKED (on object monitor) > at > org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:81) > - waiting to lock <0x0003d5f62638> (a > org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore) at > org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244) > at > org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown > Source) at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) >at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) >at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Locked ownable synchronizers: - <0x0003ce80df88> (a > java.util.concurrent.ThreadPoolExecutor$Worker) > "Flink-DispatcherRestEndpoint-thread-2" #79 daemon prio=5 os_prio=0 > tid=0x7f1793473800 nid=0x23a runnable [0x7f17922fd000] > java.lang.Thread.State: RUNNABLE at > org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.add(MetricStore.java:216) >at > org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:82) > - locked <0x0003d5f62638> (a > org.apac
[jira] [Updated] (FLINK-33162) seperate the executor in DefaultDispatcherResourceManagerComponentFactory for MetricFetcher and webMonitorEndpoint
[ https://issues.apache.org/jira/browse/FLINK-33162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-33162: -- Description: when starting a job with large number of taskmanagers, the jobmanager of the job failed to respond to and rest request. when look into the jstack we found all the 4 threads are server metrics fetcher. {code:java} // code placeholder "Flink-DispatcherRestEndpoint-thread-4" #91 daemon prio=5 os_prio=0 tid=0x7f17e7823000 nid=0x246 waiting for monitor entry [0x7f178e9fe000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:81) - waiting to lock <0x0003d5f62638> (a org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore) at org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244) at org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown Source) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Locked ownable synchronizers:- <0x0003ce80d8f0> (a java.util.concurrent.ThreadPoolExecutor$Worker) "Flink-DispatcherRestEndpoint-thread-3" #88 daemon prio=5 os_prio=0 tid=0x7f17e88af000 nid=0x243 waiting for monitor entry [0x7f1790dfe000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:81) - waiting to lock <0x0003d5f62638> (a org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore) at org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244) at org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown Source) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Locked ownable synchronizers:- <0x0003ce80df88> (a java.util.concurrent.ThreadPoolExecutor$Worker) "Flink-DispatcherRestEndpoint-thread-2" #79 daemon prio=5 os_prio=0 tid=0x7f1793473800 nid=0x23a runnable [0x7f17922fd000] java.lang.Thread.State: RUNNABLE at org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.add(MetricStore.java:216) at org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:82) - locked <0x0003d5f62638> (a org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore) at org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244) at org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown Source) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Sche
[jira] [Created] (FLINK-33162) seperate the executor in DefaultDispatcherResourceManagerComponentFactory for MetricFetcher and webMonitorEndpoint
xiaogang zhou created FLINK-33162: - Summary: seperate the executor in DefaultDispatcherResourceManagerComponentFactory for MetricFetcher and webMonitorEndpoint Key: FLINK-33162 URL: https://issues.apache.org/jira/browse/FLINK-33162 Project: Flink Issue Type: Improvement Components: Runtime / REST Affects Versions: 1.16.0 Reporter: xiaogang zhou Fix For: 1.19.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-33038) remove getMinRetentionTime in StreamExecDeduplicate
[ https://issues.apache.org/jira/browse/FLINK-33038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762248#comment-17762248 ] xiaogang zhou edited comment on FLINK-33038 at 9/6/23 2:26 AM: --- [~Sergey Nuyanzin] fixed, would you please have a quick review? was (Author: zhoujira86): [~Sergey Nuyanzin] [https://github.com/apache/flink/pull/23360] fixed, would you please have a quick review? > remove getMinRetentionTime in StreamExecDeduplicate > --- > > Key: FLINK-33038 > URL: https://issues.apache.org/jira/browse/FLINK-33038 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: xiaogang zhou >Assignee: xiaogang zhou >Priority: Minor > Fix For: 1.19.0 > > > I suggest to remove the getMinRetentionTime method in StreamExecDeduplicate > as it is not called by anyone and the ttl is controlled by the state meta > data. > > Please let me take the issue if possible -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33038) remove getMinRetentionTime in StreamExecDeduplicate
[ https://issues.apache.org/jira/browse/FLINK-33038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762248#comment-17762248 ] xiaogang zhou commented on FLINK-33038: --- [~Sergey Nuyanzin] [https://github.com/apache/flink/pull/23360] fixed, would you please have a quick review? > remove getMinRetentionTime in StreamExecDeduplicate > --- > > Key: FLINK-33038 > URL: https://issues.apache.org/jira/browse/FLINK-33038 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: xiaogang zhou >Assignee: xiaogang zhou >Priority: Minor > Fix For: 1.19.0 > > > I suggest to remove the getMinRetentionTime method in StreamExecDeduplicate > as it is not called by anyone and the ttl is controlled by the state meta > data. > > Please let me take the issue if possible -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33038) remove getMinRetentionTime in StreamExecDeduplicate
[ https://issues.apache.org/jira/browse/FLINK-33038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762032#comment-17762032 ] xiaogang zhou commented on FLINK-33038: --- [~snuyanzin] would you please help assign to me? also cc the modifier [~qingyue] > remove getMinRetentionTime in StreamExecDeduplicate > --- > > Key: FLINK-33038 > URL: https://issues.apache.org/jira/browse/FLINK-33038 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: xiaogang zhou >Priority: Minor > Fix For: 1.19.0 > > > I suggest to remove the getMinRetentionTime method in StreamExecDeduplicate > as it is not called by anyone and the ttl is controlled by the state meta > data. > > Please let me take the issue if possible -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33038) remove getMinRetentionTime in StreamExecDeduplicate
[ https://issues.apache.org/jira/browse/FLINK-33038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-33038: -- Description: I suggest to remove the getMinRetentionTime method in StreamExecDeduplicate as it is not called by anyone and the ttl is controlled by the state meta data. Please let me take the issue if possible was: I suggest to remove the StreamExecDeduplicate method in StreamExecDeduplicate as the ttl is controlled by the state meta data. Please let me take the issue if possible > remove getMinRetentionTime in StreamExecDeduplicate > --- > > Key: FLINK-33038 > URL: https://issues.apache.org/jira/browse/FLINK-33038 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: xiaogang zhou >Priority: Minor > Fix For: 1.19.0 > > > I suggest to remove the getMinRetentionTime method in StreamExecDeduplicate > as it is not called by anyone and the ttl is controlled by the state meta > data. > > Please let me take the issue if possible -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33038) remove getMinRetentionTime in StreamExecDeduplicate
xiaogang zhou created FLINK-33038: - Summary: remove getMinRetentionTime in StreamExecDeduplicate Key: FLINK-33038 URL: https://issues.apache.org/jira/browse/FLINK-33038 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.18.0 Reporter: xiaogang zhou Fix For: 1.19.0 I suggest to remove the StreamExecDeduplicate method in StreamExecDeduplicate as the ttl is controlled by the state meta data. Please let me take the issue if possible -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-22059) add a new option is rocksdb statebackend to enable job threads setting
[ https://issues.apache.org/jira/browse/FLINK-22059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17761587#comment-17761587 ] xiaogang zhou commented on FLINK-22059: --- [~Zakelly] yes > add a new option is rocksdb statebackend to enable job threads setting > -- > > Key: FLINK-22059 > URL: https://issues.apache.org/jira/browse/FLINK-22059 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.12.2 >Reporter: xiaogang zhou >Assignee: xiaogang zhou >Priority: Minor > Labels: auto-deprioritized-major, auto-unassigned, stale-assigned > Fix For: 1.18.0 > > > As discussed in FLINK-21688 , now we are using the setIncreaseParallelism > function to set the number of rocksdb's working threads. > > can we enable another setting key to set the rocksdb's max backgroud jobs > which will set a large flush thread number. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32765) create view should reuse calcite tree
[ https://issues.apache.org/jira/browse/FLINK-32765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-32765: -- Description: {code:java} // code placeholder CREATE TABLE source ( k2 bigint, o1 bigint ) COMMENT 'source' WITH ( 'connector' = 'datagen' ); CREATE TABLE print_table ( k1 bigint ) WITH ( 'connector' = 'blackhole' ); CREATE TABLE print_table2 ( k1 bigint ) WITH ( 'connector' = 'blackhole' ); create view v1 as select k2*2 as k2 from source where o1 > 20; insert into print_table select k2 from v1 where k2 >9; insert into print_table2 select k2 from v1 where k2 <80; {code} This SQL will cause view v1 logic being created for 2 times. Why can't us create a table in executeInternal and keep a QueryOperationCatalogView in CatalogManager? {code:java} // code placeholder public TableResult executeInternal(String statement) ... else if (operation instanceof CreateViewOperation) { CreateViewOperation createViewOperation = (CreateViewOperation) operation; Table table = sqlQuery(createViewOperation.getCatalogView().getOriginalQuery()); {code} this could enable us to avoid codegen for multiple time if we could reuse the some part of the query was: {code:java} // code placeholder CREATE TABLE source ( k2 bigint, o1 bigint ) COMMENT 'source' WITH ( 'connector' = 'datagen' ); CREATE TABLE print_table ( k1 bigint ) WITH ( 'connector' = 'blackhole' ); CREATE TABLE print_table2 ( k1 bigint ) WITH ( 'connector' = 'blackhole' ); create view v1 as select k2*2 as k2 from source where o1 > 20; insert into print_table select k2 from v1 where k2 >9; insert into print_table2 select k2 from v1 where k2 <80; {code} This SQL will cause view v1 logic being created for 2 times. Why can't us create a table in executeSql and keep a QueryOperationCatalogView in CatalogManager? {code:java} // code placeholder public TableResult executeSql(String statement) ... else if (operation instanceof CreateViewOperation) { CreateViewOperation createViewOperation = (CreateViewOperation) operation; Table table = sqlQuery(createViewOperation.getCatalogView().getOriginalQuery()); {code} this could enable us to avoid codegen for multiple time if we could reuse the some part of the query > create view should reuse calcite tree > - > > Key: FLINK-32765 > URL: https://issues.apache.org/jira/browse/FLINK-32765 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > > {code:java} > // code placeholder > CREATE TABLE source ( > k2 bigint, > o1 bigint > ) > COMMENT 'source' > WITH ( > 'connector' = 'datagen' > ); > CREATE TABLE print_table ( > k1 bigint > ) WITH ( > 'connector' = 'blackhole' > ); > CREATE TABLE print_table2 ( > k1 bigint > ) WITH ( > 'connector' = 'blackhole' > ); > create view v1 as > select k2*2 as k2 from source where o1 > 20; > insert into print_table select k2 from v1 where k2 >9; > insert into print_table2 select k2 from v1 where k2 <80; {code} > This SQL will cause view v1 logic being created for 2 times. Why can't us > create a table in executeInternal > and keep a QueryOperationCatalogView in CatalogManager? > {code:java} > // code placeholder > public TableResult executeInternal(String statement) > ... > else if (operation instanceof CreateViewOperation) { > CreateViewOperation createViewOperation = (CreateViewOperation) operation; > Table table = > sqlQuery(createViewOperation.getCatalogView().getOriginalQuery()); {code} > > > this could enable us to avoid codegen for multiple time if we could reuse the > some part of the query > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32765) create view should reuse calcite tree
[ https://issues.apache.org/jira/browse/FLINK-32765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-32765: -- Description: {code:java} // code placeholder CREATE TABLE source ( k2 bigint, o1 bigint ) COMMENT 'source' WITH ( 'connector' = 'datagen' ); CREATE TABLE print_table ( k1 bigint ) WITH ( 'connector' = 'blackhole' ); CREATE TABLE print_table2 ( k1 bigint ) WITH ( 'connector' = 'blackhole' ); create view v1 as select k2*2 as k2 from source where o1 > 20; insert into print_table select k2 from v1 where k2 >9; insert into print_table2 select k2 from v1 where k2 <80; {code} This SQL will cause view v1 logic being created for 2 times. Why can't us create a table in executeInternal and keep a QueryOperationCatalogView in CatalogManager? {code:java} // code placeholder public TableResult executeInternal(String statement) ... else if (operation instanceof CreateViewOperation) { CreateViewOperation createViewOperation = (CreateViewOperation) operation; Table table = sqlQuery(createViewOperation.getCatalogView().getOriginalQuery()); {code} this could enable us to avoid codegen for multiple time if we could reuse the some part of the query was: {code:java} // code placeholder CREATE TABLE source ( k2 bigint, o1 bigint ) COMMENT 'source' WITH ( 'connector' = 'datagen' ); CREATE TABLE print_table ( k1 bigint ) WITH ( 'connector' = 'blackhole' ); CREATE TABLE print_table2 ( k1 bigint ) WITH ( 'connector' = 'blackhole' ); create view v1 as select k2*2 as k2 from source where o1 > 20; insert into print_table select k2 from v1 where k2 >9; insert into print_table2 select k2 from v1 where k2 <80; {code} This SQL will cause view v1 logic being created for 2 times. Why can't us create a table in executeInternal and keep a QueryOperationCatalogView in CatalogManager? {code:java} // code placeholder public TableResult executeInternal(String statement) ... else if (operation instanceof CreateViewOperation) { CreateViewOperation createViewOperation = (CreateViewOperation) operation; Table table = sqlQuery(createViewOperation.getCatalogView().getOriginalQuery()); {code} this could enable us to avoid codegen for multiple time if we could reuse the some part of the query > create view should reuse calcite tree > - > > Key: FLINK-32765 > URL: https://issues.apache.org/jira/browse/FLINK-32765 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > > {code:java} > // code placeholder > CREATE TABLE source ( > k2 bigint, > o1 bigint > ) > COMMENT 'source' > WITH ( > 'connector' = 'datagen' > ); > CREATE TABLE print_table ( > k1 bigint > ) WITH ( > 'connector' = 'blackhole' > ); > CREATE TABLE print_table2 ( > k1 bigint > ) WITH ( > 'connector' = 'blackhole' > ); > create view v1 as > select k2*2 as k2 from source where o1 > 20; > insert into print_table select k2 from v1 where k2 >9; > insert into print_table2 select k2 from v1 where k2 <80; {code} > This SQL will cause view v1 logic being created for 2 times. Why can't us > create a table in executeInternal and keep a QueryOperationCatalogView in > CatalogManager? > {code:java} > // code placeholder > public TableResult executeInternal(String statement) > ... > else if (operation instanceof CreateViewOperation) { > CreateViewOperation createViewOperation = (CreateViewOperation) operation; > Table table = > sqlQuery(createViewOperation.getCatalogView().getOriginalQuery()); {code} > > > this could enable us to avoid codegen for multiple time if we could reuse the > some part of the query > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32765) create view should reuse calcite tree
[ https://issues.apache.org/jira/browse/FLINK-32765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-32765: -- Description: {code:java} // code placeholder CREATE TABLE source ( k2 bigint, o1 bigint ) COMMENT 'source' WITH ( 'connector' = 'datagen' ); CREATE TABLE print_table ( k1 bigint ) WITH ( 'connector' = 'blackhole' ); CREATE TABLE print_table2 ( k1 bigint ) WITH ( 'connector' = 'blackhole' ); create view v1 as select k2*2 as k2 from source where o1 > 20; insert into print_table select k2 from v1 where k2 >9; insert into print_table2 select k2 from v1 where k2 <80; {code} This SQL will cause view v1 logic being created for 2 times. Why can't us create a table in executeSql and keep a QueryOperationCatalogView in CatalogManager? {code:java} // code placeholder public TableResult executeSql(String statement) ... else if (operation instanceof CreateViewOperation) { CreateViewOperation createViewOperation = (CreateViewOperation) operation; Table table = sqlQuery(createViewOperation.getCatalogView().getOriginalQuery()); {code} this could enable us to avoid codegen for multiple time if we could reuse the some part of the query > create view should reuse calcite tree > - > > Key: FLINK-32765 > URL: https://issues.apache.org/jira/browse/FLINK-32765 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > > {code:java} > // code placeholder > CREATE TABLE source ( > k2 bigint, > o1 bigint > ) > COMMENT 'source' > WITH ( > 'connector' = 'datagen' > ); > CREATE TABLE print_table ( > k1 bigint > ) WITH ( > 'connector' = 'blackhole' > ); > CREATE TABLE print_table2 ( > k1 bigint > ) WITH ( > 'connector' = 'blackhole' > ); > create view v1 as > select k2*2 as k2 from source where o1 > 20; > insert into print_table select k2 from v1 where k2 >9; > insert into print_table2 select k2 from v1 where k2 <80; {code} > This SQL will cause view v1 logic being created for 2 times. Why can't us > create a table in executeSql and keep a QueryOperationCatalogView in > CatalogManager? > {code:java} > // code placeholder > public TableResult executeSql(String statement) > ... > else if (operation instanceof CreateViewOperation) { > CreateViewOperation createViewOperation = (CreateViewOperation) operation; > Table table = > sqlQuery(createViewOperation.getCatalogView().getOriginalQuery()); {code} > > > this could enable us to avoid codegen for multiple time if we could reuse the > some part of the query > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32765) create view should reuse calcite tree
[ https://issues.apache.org/jira/browse/FLINK-32765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-32765: -- Component/s: Table SQL / API (was: Table SQL / Planner) > create view should reuse calcite tree > - > > Key: FLINK-32765 > URL: https://issues.apache.org/jira/browse/FLINK-32765 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32765) create view should reuse calcite tree
xiaogang zhou created FLINK-32765: - Summary: create view should reuse calcite tree Key: FLINK-32765 URL: https://issues.apache.org/jira/browse/FLINK-32765 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.16.1 Reporter: xiaogang zhou -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32732) auto offset reset should be exposed to user
[ https://issues.apache.org/jira/browse/FLINK-32732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17750532#comment-17750532 ] xiaogang zhou commented on FLINK-32732: --- [~renqs] The main defect of the current implement is that if the earliest records are timed out very quickly, the task can always be restarting when it found the offset not found when it startup > auto offset reset should be exposed to user > --- > > Key: FLINK-32732 > URL: https://issues.apache.org/jira/browse/FLINK-32732 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > > {code:java} > // code placeholder > maybeOverride( > ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > > startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase(), > true); {code} > now flink override the auto.offset.reset with the scan.startup.mode config, > and user's explicit config does not take effect. I think maybe we should > expose this to customer? > > I think after consuming kafka records from earliest to latest, the > scan.startup.mode should no longer influence the kafka scan behave. So I > suggest change the override to false. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-32732) auto offset reset should be exposed to user
[ https://issues.apache.org/jira/browse/FLINK-32732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17750183#comment-17750183 ] xiaogang zhou edited comment on FLINK-32732 at 8/3/23 2:30 AM: --- [~renqs] let me know your thinking please was (Author: zhoujira86): [~libenchao] [~renqs] let me know your thinking please > auto offset reset should be exposed to user > --- > > Key: FLINK-32732 > URL: https://issues.apache.org/jira/browse/FLINK-32732 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > > {code:java} > // code placeholder > maybeOverride( > ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > > startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase(), > true); {code} > now flink override the auto.offset.reset with the scan.startup.mode config, > and user's explicit config does not take effect. I think maybe we should > expose this to customer? > > I think after consuming kafka records from earliest to latest, the > scan.startup.mode should no longer influence the kafka scan behave. So I > suggest change the override to false. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32738) PROTOBUF format supports projection push down
[ https://issues.apache.org/jira/browse/FLINK-32738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17750527#comment-17750527 ] xiaogang zhou commented on FLINK-32738: --- [~libenchao] created, please help assign to me master, thanks a lot > PROTOBUF format supports projection push down > - > > Key: FLINK-32738 > URL: https://issues.apache.org/jira/browse/FLINK-32738 > Project: Flink > Issue Type: Sub-task > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.18.0 >Reporter: xiaogang zhou >Assignee: xiaogang zhou >Priority: Major > Fix For: 1.18.0 > > > support projection push down for protobuf -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32738) PROTOBUF format supports projection push down
xiaogang zhou created FLINK-32738: - Summary: PROTOBUF format supports projection push down Key: FLINK-32738 URL: https://issues.apache.org/jira/browse/FLINK-32738 Project: Flink Issue Type: Sub-task Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.18.0 Reporter: xiaogang zhou Fix For: 1.18.0 support projection push down for protobuf -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32607) Kafka table source and json format support projection pushdown
[ https://issues.apache.org/jira/browse/FLINK-32607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17750184#comment-17750184 ] xiaogang zhou commented on FLINK-32607: --- [~libenchao] Not sure if someone has taken the pb projection pushdown. if no , Can you please assign it to me? > Kafka table source and json format support projection pushdown > -- > > Key: FLINK-32607 > URL: https://issues.apache.org/jira/browse/FLINK-32607 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka, Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: dalongliu >Priority: Major > Fix For: 1.18.0 > > > ProjectionPushDown has a huge performance impact and is not currently > implemented in Kafka Source, so we can support it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32732) auto offset reset should be exposed to user
[ https://issues.apache.org/jira/browse/FLINK-32732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17750183#comment-17750183 ] xiaogang zhou commented on FLINK-32732: --- [~libenchao] [~renqs] let me know your thinking please > auto offset reset should be exposed to user > --- > > Key: FLINK-32732 > URL: https://issues.apache.org/jira/browse/FLINK-32732 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > > {code:java} > // code placeholder > maybeOverride( > ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > > startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase(), > true); {code} > now flink override the auto.offset.reset with the scan.startup.mode config, > and user's explicit config does not take effect. I think maybe we should > expose this to customer? > > I think after consuming kafka records from earliest to latest, the > scan.startup.mode should no longer influence the kafka scan behave. So I > suggest change the override to false. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32732) auto offset reset should be exposed to user
[ https://issues.apache.org/jira/browse/FLINK-32732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-32732: -- Description: {code:java} // code placeholder maybeOverride( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase(), true); {code} now flink override the auto.offset.reset with the scan.startup.mode config, and user's explicit config does not take effect. I think maybe we should expose this to customer? I think after consuming kafka records from earliest to latest, the scan.startup.mode should no longer influence the kafka scan behave. So I suggest change the override to false. was: {code:java} // code placeholder maybeOverride( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase(), true); {code} now flink override the auto.offset.reset with the scan.startup.mode config, and user's explicit config does not take effect. I think maybe we should expose this to customer? > auto offset reset should be exposed to user > --- > > Key: FLINK-32732 > URL: https://issues.apache.org/jira/browse/FLINK-32732 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > > {code:java} > // code placeholder > maybeOverride( > ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > > startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase(), > true); {code} > now flink override the auto.offset.reset with the scan.startup.mode config, > and user's explicit config does not take effect. I think maybe we should > expose this to customer? > > I think after consuming kafka records from earliest to latest, the > scan.startup.mode should no longer influence the kafka scan behave. So I > suggest change the override to false. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32732) auto offset reset should be exposed to user
[ https://issues.apache.org/jira/browse/FLINK-32732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-32732: -- Description: {code:java} // code placeholder maybeOverride( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase(), true); {code} now flink override the auto.offset.reset with the scan.startup.mode config, and user's explicit config does not take effect. I think maybe we should expose this to customer? > auto offset reset should be exposed to user > --- > > Key: FLINK-32732 > URL: https://issues.apache.org/jira/browse/FLINK-32732 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > > {code:java} > // code placeholder > maybeOverride( > ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > > startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase(), > true); {code} > now flink override the auto.offset.reset with the scan.startup.mode config, > and user's explicit config does not take effect. I think maybe we should > expose this to customer? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32732) auto offset reset should be exposed to user
xiaogang zhou created FLINK-32732: - Summary: auto offset reset should be exposed to user Key: FLINK-32732 URL: https://issues.apache.org/jira/browse/FLINK-32732 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Affects Versions: 1.16.1 Reporter: xiaogang zhou -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32132) Cast function CODEGEN does not work as expected when nullOnFailure enabled
[ https://issues.apache.org/jira/browse/FLINK-32132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17726445#comment-17726445 ] xiaogang zhou commented on FLINK-32132: --- [~luoyuxia] Can you please help review > Cast function CODEGEN does not work as expected when nullOnFailure enabled > -- > > Key: FLINK-32132 > URL: https://issues.apache.org/jira/browse/FLINK-32132 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Assignee: xiaogang zhou >Priority: Major > Labels: pull-request-available > > I am trying to generate code cast string to bigint, and got generated code > like: > > > {code:java} > // code placeholder > if (!isNull$14) { > result$15 = > org.apache.flink.table.data.binary.BinaryStringDataUtil.toLong(field$13.trim()); > } else { > result$15 = -1L; > } >castRuleResult$16 = result$15; >castRuleResultIsNull$17 = isNull$14; > } catch (java.lang.Throwable e) { >castRuleResult$16 = -1L; >castRuleResultIsNull$17 = true; > } > // --- End cast section > out.setLong(0, castRuleResult$16); {code} > such kind of handle does not provide a perfect solution as the default value > of long is set to -1L, which can be meaningful in some case. And can cause > some calculation error. > > And I understand the cast returns a bigint not null, But since there is a > exception, we should ignore the type restriction, so I suggest to modify the > CodeGenUtils.rowSetField like below: > > {code:java} > // code placeholder > if (fieldType.isNullable || > fieldExpr.nullTerm.startsWith("castRuleResultIsNull")) { > s""" > |${fieldExpr.code} > |if (${fieldExpr.nullTerm}) { > | $setNullField; > |} else { > | $writeField; > |} > """.stripMargin > } else { > s""" > |${fieldExpr.code} > |$writeField; >""".stripMargin > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (FLINK-32115) json_value support cache
[ https://issues.apache.org/jira/browse/FLINK-32115 ] xiaogang zhou deleted comment on FLINK-32115: --- was (Author: zhoujira86): [~luoyuxia] Hi yuxia, can you please help review this? > json_value support cache > > > Key: FLINK-32115 > URL: https://issues.apache.org/jira/browse/FLINK-32115 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > > +underlined > text+[https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java] > > hive support json object cache for previous deserialized value, could we > consider use a cache objects in JsonValueCallGen? > > This optimize can improve performance of SQL like > > select > json_value(A, 'xxx'), > json_value(A, 'yyy'), > json_value(A, 'zzz'), > ... > a lot > > I added a static LRU cache into SqlJsonUtils, and refactor the > jsonValueExpression1 like > {code:java} > private static JsonValueContext jsonValueExpression1(String input) { > JsonValueContext parsedJsonContext = EXTRACT_OBJECT_CACHE.get(input); > if (parsedJsonContext != null) { > return parsedJsonContext; > } > try { > parsedJsonContext = JsonValueContext.withJavaObj(dejsonize(input)); > } catch (Exception e) { > parsedJsonContext = JsonValueContext.withException(e); > } > EXTRACT_OBJECT_CACHE.put(input, parsedJsonContext); > return parsedJsonContext; > } {code} > > and benchmarked like: > {code:java} > public static void main(String[] args) { > String input = > "{\"social\":[{\"weibo\":\"https://weibo.com/xiaoming\"},{\"github\":\"https://github.com/xiaoming\"}]}";; > Long start = System.currentTimeMillis(); > for (int i = 0; i < 100; i++) { > Object dejsonize = jsonValueExpression1(input); > } > System.err.println(System.currentTimeMillis() - start); > } {code} > > time 2 benchmark takes is: > ||case||milli second taken|| > |cache|33| > |no cache|1591| > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32115) json_value support cache
[ https://issues.apache.org/jira/browse/FLINK-32115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17724801#comment-17724801 ] xiaogang zhou commented on FLINK-32115: --- [~luoyuxia] Hi yuxia, can you please help review this? > json_value support cache > > > Key: FLINK-32115 > URL: https://issues.apache.org/jira/browse/FLINK-32115 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > > [https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java] > > hive support json object cache for previous deserialized value, could we > consider use a cache objects in JsonValueCallGen? > > This optimize can improve performance of SQL like > > select > json_value(A, 'xxx'), > json_value(A, 'yyy'), > json_value(A, 'zzz'), > ... > a lot > > I added a static LRU cache into SqlJsonUtils, and refactor the > jsonValueExpression1 like > {code:java} > private static JsonValueContext jsonValueExpression1(String input) { > JsonValueContext parsedJsonContext = EXTRACT_OBJECT_CACHE.get(input); > if (parsedJsonContext != null) { > return parsedJsonContext; > } > try { > parsedJsonContext = JsonValueContext.withJavaObj(dejsonize(input)); > } catch (Exception e) { > parsedJsonContext = JsonValueContext.withException(e); > } > EXTRACT_OBJECT_CACHE.put(input, parsedJsonContext); > return parsedJsonContext; > } {code} > > and benchmarked like: > {code:java} > public static void main(String[] args) { > String input = > "{\"social\":[{\"weibo\":\"https://weibo.com/xiaoming\"},{\"github\":\"https://github.com/xiaoming\"}]}";; > Long start = System.currentTimeMillis(); > for (int i = 0; i < 100; i++) { > Object dejsonize = jsonValueExpression1(input); > } > System.err.println(System.currentTimeMillis() - start); > } {code} > > time 2 benchmark takes is: > ||case||milli second taken|| > |cache|33| > |no cache|1591| > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32115) json_value support cache
[ https://issues.apache.org/jira/browse/FLINK-32115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-32115: -- Description: [https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java] hive support json object cache for previous deserialized value, could we consider use a cache objects in JsonValueCallGen? This optimize can improve performance of SQL like select json_value(A, 'xxx'), json_value(A, 'yyy'), json_value(A, 'zzz'), ... a lot I added a static LRU cache into SqlJsonUtils, and refactor the jsonValueExpression1 like {code:java} private static JsonValueContext jsonValueExpression1(String input) { JsonValueContext parsedJsonContext = EXTRACT_OBJECT_CACHE.get(input); if (parsedJsonContext != null) { return parsedJsonContext; } try { parsedJsonContext = JsonValueContext.withJavaObj(dejsonize(input)); } catch (Exception e) { parsedJsonContext = JsonValueContext.withException(e); } EXTRACT_OBJECT_CACHE.put(input, parsedJsonContext); return parsedJsonContext; } {code} and benchmarked like: {code:java} public static void main(String[] args) { String input = "{\"social\":[{\"weibo\":\"https://weibo.com/xiaoming\"},{\"github\":\"https://github.com/xiaoming\"}]}";; Long start = System.currentTimeMillis(); for (int i = 0; i < 100; i++) { Object dejsonize = jsonValueExpression1(input); } System.err.println(System.currentTimeMillis() - start); } {code} time 2 benchmark takes is: ||case||milli second taken|| |cache|33| |no cache|1591| was: [https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java] hive support json object cache for previous deserialized value, could we consider use a cache objects in JsonValueCallGen? This optimize can improve performance of SQL like select json_value(A, 'xxx'), json_value(A, 'yyy'), json_value(A, 'zzz'), ... a lot I added a static LRU cache into SqlJsonUtils, and refactor the jsonValueExpression1 like {code:java} private static JsonValueContext jsonValueExpression1(String input) { JsonValueContext parsedJsonContext = EXTRACT_OBJECT_CACHE.get(input); if (parsedJsonContext != null) { return parsedJsonContext; } try { parsedJsonContext = JsonValueContext.withJavaObj(dejsonize(input)); } catch (Exception e) { parsedJsonContext = JsonValueContext.withException(e); } EXTRACT_OBJECT_CACHE.put(input, parsedJsonContext); return parsedJsonContext; } {code} and benchmarked like: {code:java} public static void main(String[] args) { String input = "{\"social\":[{\"weibo\":\"https://weibo.com/xiaoming\"},{\"github\":\"https://github.com/xiaoming\"}]}";; Long start = System.currentTimeMillis(); for (int i = 0; i < 100; i++) { Object dejsonize = jsonValueExpression1(input); } System.err.println(System.currentTimeMillis() - start); } {code} time 2 benchmark takes is: ||case||milli second taken|| |cache|33| |no cache|1591| I > json_value support cache > > > Key: FLINK-32115 > URL: https://issues.apache.org/jira/browse/FLINK-32115 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > > [https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java] > > hive support json object cache for previous deserialized value, could we > consider use a cache objects in JsonValueCallGen? > > This optimize can improve performance of SQL like > > select > json_value(A, 'xxx'), > json_value(A, 'yyy'), > json_value(A, 'zzz'), > ... > a lot > > I added a static LRU cache into SqlJsonUtils, and refactor the > jsonValueExpression1 like > {code:java} > private static JsonValueContext jsonValueExpression1(String input) { > JsonValueContext parsedJsonContext = EXTRACT_OBJECT_CACHE.get(input); > if (parsedJsonContext != null) { > return parsedJsonContext; > } > try { > parsedJsonContext = JsonValueContext.withJavaObj(dejsonize(input)); > } catch (Exception e) { > parsedJsonContext = JsonValueContext.withException(e); > } > EXTRACT_OBJECT_CACHE.put(input, parsedJsonContext); > return parsedJsonContext; > } {code} > > and benchmarked like: > {code:java} > public static void main(String[] args) { > String input = > "{\"social\":[{\"weibo\":\"https://weibo.com/xiaoming\"},{\"github\":\"https://github.com/xiaoming\"}]}";; > Long start = System.currentTimeMillis(); > for (int i = 0; i < 100; i++) { > Object dejsonize = jsonValueExpression1(i
[jira] [Updated] (FLINK-32115) json_value support cache
[ https://issues.apache.org/jira/browse/FLINK-32115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-32115: -- Description: [https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java] hive support json object cache for previous deserialized value, could we consider use a cache objects in JsonValueCallGen? This optimize can improve performance of SQL like select json_value(A, 'xxx'), json_value(A, 'yyy'), json_value(A, 'zzz'), ... a lot I added a static LRU cache into SqlJsonUtils, and refactor the jsonValueExpression1 like {code:java} private static JsonValueContext jsonValueExpression1(String input) { JsonValueContext parsedJsonContext = EXTRACT_OBJECT_CACHE.get(input); if (parsedJsonContext != null) { return parsedJsonContext; } try { parsedJsonContext = JsonValueContext.withJavaObj(dejsonize(input)); } catch (Exception e) { parsedJsonContext = JsonValueContext.withException(e); } EXTRACT_OBJECT_CACHE.put(input, parsedJsonContext); return parsedJsonContext; } {code} and benchmarked like: {code:java} public static void main(String[] args) { String input = "{\"social\":[{\"weibo\":\"https://weibo.com/xiaoming\"},{\"github\":\"https://github.com/xiaoming\"}]}";; Long start = System.currentTimeMillis(); for (int i = 0; i < 100; i++) { Object dejsonize = jsonValueExpression1(input); } System.err.println(System.currentTimeMillis() - start); } {code} time 2 benchmark takes is: ||case||milli second taken|| |cache|33| |no cache|1591| I was: [https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java] hive support json object cache for previous deserialized value, could we consider use a cache objects in JsonValueCallGen? This optimize can improve performance of SQL like select json_value(A, 'xxx'), json_value(A, 'yyy'), json_value(A, 'zzz'), ... a lot I have tested it with SQL like (keys are replaced for security reason) insert into blackhole select JSON_VALUE(`message`,'$.a') as scene, JSON_VALUE(`message`,'$.b') as screen_height, JSON_VALUE(`message`,'$.c') as longitude, JSON_VALUE(`message`,'$.d') as device_id, JSON_VALUE(`message`,'$.e') as receive_time, JSON_VALUE(`message`,'$.f') as app_build, JSON_VALUE(`message`,'$.g') as track_id, JSON_VALUE(`message`,'$.h') as distinct_id from xxx; the cached UDF is about 2 times the speed the nocache UDF do. > json_value support cache > > > Key: FLINK-32115 > URL: https://issues.apache.org/jira/browse/FLINK-32115 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > > [https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java] > > hive support json object cache for previous deserialized value, could we > consider use a cache objects in JsonValueCallGen? > > This optimize can improve performance of SQL like > > select > json_value(A, 'xxx'), > json_value(A, 'yyy'), > json_value(A, 'zzz'), > ... > a lot > > I added a static LRU cache into SqlJsonUtils, and refactor the > jsonValueExpression1 like > {code:java} > private static JsonValueContext jsonValueExpression1(String input) { > JsonValueContext parsedJsonContext = EXTRACT_OBJECT_CACHE.get(input); > if (parsedJsonContext != null) { > return parsedJsonContext; > } > try { > parsedJsonContext = JsonValueContext.withJavaObj(dejsonize(input)); > } catch (Exception e) { > parsedJsonContext = JsonValueContext.withException(e); > } > EXTRACT_OBJECT_CACHE.put(input, parsedJsonContext); > return parsedJsonContext; > } {code} > > and benchmarked like: > {code:java} > public static void main(String[] args) { > String input = > "{\"social\":[{\"weibo\":\"https://weibo.com/xiaoming\"},{\"github\":\"https://github.com/xiaoming\"}]}";; > Long start = System.currentTimeMillis(); > for (int i = 0; i < 100; i++) { > Object dejsonize = jsonValueExpression1(input); > } > System.err.println(System.currentTimeMillis() - start); > } {code} > > time 2 benchmark takes is: > ||case||milli second taken|| > |cache|33| > |no cache|1591| > > > I -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32132) Cast function CODEGEN does not work as expected when nullOnFailure enabled
[ https://issues.apache.org/jira/browse/FLINK-32132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17724780#comment-17724780 ] xiaogang zhou commented on FLINK-32132: --- [~luoyuxia] yes, please assign to me > Cast function CODEGEN does not work as expected when nullOnFailure enabled > -- > > Key: FLINK-32132 > URL: https://issues.apache.org/jira/browse/FLINK-32132 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > > I am trying to generate code cast string to bigint, and got generated code > like: > > > {code:java} > // code placeholder > if (!isNull$14) { > result$15 = > org.apache.flink.table.data.binary.BinaryStringDataUtil.toLong(field$13.trim()); > } else { > result$15 = -1L; > } >castRuleResult$16 = result$15; >castRuleResultIsNull$17 = isNull$14; > } catch (java.lang.Throwable e) { >castRuleResult$16 = -1L; >castRuleResultIsNull$17 = true; > } > // --- End cast section > out.setLong(0, castRuleResult$16); {code} > such kind of handle does not provide a perfect solution as the default value > of long is set to -1L, which can be meaningful in some case. And can cause > some calculation error. > > And I understand the cast returns a bigint not null, But since there is a > exception, we should ignore the type restriction, so I suggest to modify the > CodeGenUtils.rowSetField like below: > > {code:java} > // code placeholder > if (fieldType.isNullable || > fieldExpr.nullTerm.startsWith("castRuleResultIsNull")) { > s""" > |${fieldExpr.code} > |if (${fieldExpr.nullTerm}) { > | $setNullField; > |} else { > | $writeField; > |} > """.stripMargin > } else { > s""" > |${fieldExpr.code} > |$writeField; >""".stripMargin > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32132) Cast function CODEGEN does not work as expected when nullOnFailure enabled
[ https://issues.apache.org/jira/browse/FLINK-32132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-32132: -- Description: I am trying to generate code cast string to bigint, and got generated code like: {code:java} // code placeholder if (!isNull$14) { result$15 = org.apache.flink.table.data.binary.BinaryStringDataUtil.toLong(field$13.trim()); } else { result$15 = -1L; } castRuleResult$16 = result$15; castRuleResultIsNull$17 = isNull$14; } catch (java.lang.Throwable e) { castRuleResult$16 = -1L; castRuleResultIsNull$17 = true; } // --- End cast section out.setLong(0, castRuleResult$16); {code} such kind of handle does not provide a perfect solution as the default value of long is set to -1L, which can be meaningful in some case. And can cause some calculation error. And I understand the cast returns a bigint not null, But since there is a exception, we should ignore the type restriction, so I suggest to modify the CodeGenUtils.rowSetField like below: {code:java} // code placeholder if (fieldType.isNullable || fieldExpr.nullTerm.startsWith("castRuleResultIsNull")) { s""" |${fieldExpr.code} |if (${fieldExpr.nullTerm}) { | $setNullField; |} else { | $writeField; |} """.stripMargin } else { s""" |${fieldExpr.code} |$writeField; """.stripMargin } {code} > Cast function CODEGEN does not work as expected when nullOnFailure enabled > -- > > Key: FLINK-32132 > URL: https://issues.apache.org/jira/browse/FLINK-32132 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > > I am trying to generate code cast string to bigint, and got generated code > like: > > > {code:java} > // code placeholder > if (!isNull$14) { > result$15 = > org.apache.flink.table.data.binary.BinaryStringDataUtil.toLong(field$13.trim()); > } else { > result$15 = -1L; > } >castRuleResult$16 = result$15; >castRuleResultIsNull$17 = isNull$14; > } catch (java.lang.Throwable e) { >castRuleResult$16 = -1L; >castRuleResultIsNull$17 = true; > } > // --- End cast section > out.setLong(0, castRuleResult$16); {code} > such kind of handle does not provide a perfect solution as the default value > of long is set to -1L, which can be meaningful in some case. And can cause > some calculation error. > > And I understand the cast returns a bigint not null, But since there is a > exception, we should ignore the type restriction, so I suggest to modify the > CodeGenUtils.rowSetField like below: > > {code:java} > // code placeholder > if (fieldType.isNullable || > fieldExpr.nullTerm.startsWith("castRuleResultIsNull")) { > s""" > |${fieldExpr.code} > |if (${fieldExpr.nullTerm}) { > | $setNullField; > |} else { > | $writeField; > |} > """.stripMargin > } else { > s""" > |${fieldExpr.code} > |$writeField; >""".stripMargin > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32132) Cast function CODEGEN does not work as expected when nullOnFailure enabled
xiaogang zhou created FLINK-32132: - Summary: Cast function CODEGEN does not work as expected when nullOnFailure enabled Key: FLINK-32132 URL: https://issues.apache.org/jira/browse/FLINK-32132 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.16.1 Reporter: xiaogang zhou -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32115) json_value support cache
[ https://issues.apache.org/jira/browse/FLINK-32115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-32115: -- Description: [https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java] hive support json object cache for previous deserialized value, could we consider use a cache objects in JsonValueCallGen? This optimize can improve performance of SQL like select json_value(A, 'xxx'), json_value(A, 'yyy'), json_value(A, 'zzz'), ... a lot I have tested it with SQL like (keys are replaced for security reason) insert into blackhole select JSON_VALUE(`message`,'$.a') as scene, JSON_VALUE(`message`,'$.b') as screen_height, JSON_VALUE(`message`,'$.c') as longitude, JSON_VALUE(`message`,'$.d') as device_id, JSON_VALUE(`message`,'$.e') as receive_time, JSON_VALUE(`message`,'$.f') as app_build, JSON_VALUE(`message`,'$.g') as track_id, JSON_VALUE(`message`,'$.h') as distinct_id from xxx; the cached UDF is about 2 times the speed the nocache UDF do. was: [https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java] hive support json object cache for previous deserialized value, could we consider use a cache objects in JsonValueCallGen? This optimize can improve performance of SQL like select json_value(A, 'xxx'), json_value(A, 'yyy'), json_value(A, 'zzz'), ... a lot > json_value support cache > > > Key: FLINK-32115 > URL: https://issues.apache.org/jira/browse/FLINK-32115 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > > [https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java] > > hive support json object cache for previous deserialized value, could we > consider use a cache objects in JsonValueCallGen? > > This optimize can improve performance of SQL like > > select > json_value(A, 'xxx'), > json_value(A, 'yyy'), > json_value(A, 'zzz'), > ... > a lot > > > I have tested it with SQL like (keys are replaced for security reason) > > insert into blackhole > select > JSON_VALUE(`message`,'$.a') as scene, > JSON_VALUE(`message`,'$.b') as screen_height, > JSON_VALUE(`message`,'$.c') as longitude, > JSON_VALUE(`message`,'$.d') as device_id, > JSON_VALUE(`message`,'$.e') as receive_time, > JSON_VALUE(`message`,'$.f') as app_build, > JSON_VALUE(`message`,'$.g') as track_id, > JSON_VALUE(`message`,'$.h') as distinct_id > from xxx; > > the cached UDF is about 2 times the speed the nocache UDF do. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32115) json_value support cache
[ https://issues.apache.org/jira/browse/FLINK-32115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-32115: -- Description: [https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java] hive support json object cache for previous deserialized value, could we consider use a cache objects in JsonValueCallGen? This optimize can improve performance of SQL like select json_value(A, 'xxx'), json_value(A, 'yyy'), json_value(A, 'zzz'), ... a lot was: [https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java] hive support json object cache for previous deserialized value, could we consider use a cache objects in JsonValueCallGen? > json_value support cache > > > Key: FLINK-32115 > URL: https://issues.apache.org/jira/browse/FLINK-32115 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > > [https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java] > > hive support json object cache for previous deserialized value, could we > consider use a cache objects in JsonValueCallGen? > > This optimize can improve performance of SQL like > > select > json_value(A, 'xxx'), > json_value(A, 'yyy'), > json_value(A, 'zzz'), > ... > a lot -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32115) json_value support cache
xiaogang zhou created FLINK-32115: - Summary: json_value support cache Key: FLINK-32115 URL: https://issues.apache.org/jira/browse/FLINK-32115 Project: Flink Issue Type: Improvement Components: Table SQL / Runtime Affects Versions: 1.16.1 Reporter: xiaogang zhou [https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java] hive support json object cache for previous deserialized value, could we consider use a cache objects in JsonValueCallGen? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-16009) Native support for the Variable-length and Zig-zag Variable-length integers
[ https://issues.apache.org/jira/browse/FLINK-16009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17719745#comment-17719745 ] xiaogang zhou commented on FLINK-16009: --- [~nkruber] [~gaoyunhaii] Hi masters, I have created a FLIP WIKI [https://cwiki.apache.org/confluence/display/FLINK/%5BWIP%5DFLIP-310%3Ause+VARINT+and+ZIGZAG+to+encode+ROWDATA+in+state] Can you please help me review and let me know can I take this ticket? > Native support for the Variable-length and Zig-zag Variable-length integers > --- > > Key: FLINK-16009 > URL: https://issues.apache.org/jira/browse/FLINK-16009 > Project: Flink > Issue Type: Improvement > Components: API / Type Serialization System >Reporter: Yun Gao >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor > > Currently Flink only support fixed-length integers. However, in many cases > the values of the integer fields tend to be small, and we could reduce the > size of serialized values by using [Variable length > encoding|https://developers.google.com/protocol-buffers/docs/encoding#varints] > and [Zig-zag variable-length > encoding|https://developers.google.com/protocol-buffers/docs/encoding#signed-integers]. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-31225) rocksdb max open file can lead to oom
[ https://issues.apache.org/jira/browse/FLINK-31225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17693953#comment-17693953 ] xiaogang zhou edited comment on FLINK-31225 at 2/27/23 10:54 AM: - [~yunta] Master I logged onto the container, do a list and find out: /tmp/flink-io-b7483fcd-50aa-45d2-bd8e-a2b0862c6323/job__op_LegacyKeyedCoProcessOperator_a3bf5557de0062839a60e12819947e17_{_}12_50{_}_uuid_b91b9766-b8af-4de3-9a01-b5dcce29a7bf/db# ll | grep sst {-}rw-r{-}{-}r{-}- 1 flink flink 30662931 Feb 27 18:20 002512.sst {-}rw-r{-}{-}r{-}- 1 flink flink 1788364 Feb 27 18:30 002513.sst {-}rw-r{-}{-}r{-}- 1 flink flink 3209306 Feb 27 18:40 002515.sst {-}rw-r{-}{-}r{-}- 1 flink flink 13443570 Feb 27 18:40 002517.sst {-}rw-r{-}{-}r{-}- 1 flink flink 1694438 Feb 27 18:50 002518.sst {-}rw-r{-}{-}r{-}- 1 flink flink 1509487 Feb 27 18:50 002519.sst I think the LSM tree merge process will delete L0 files. And as mentioned above [https://github.com/facebook/rocksdb/issues/4112] . some one mentioned this. this is not exactly a leak but a lot of memory is allocated but not released const int table_cache_size = (mutable_db_options_.max_open_files == -1) ? TableCache::kInfiniteCapacity : mutable_db_options_.max_open_files - 10; table_cache_ = NewLRUCache(table_cache_size, immutable_db_options_.table_cache_numshardbits); all allocated records are stored in this cache mutable_db_options_.max_open_files is equal 1 so table_cache_size= 4 mb seems this mutable_db_options_.max_open_files = -1 configuration will save TableReader(even if the sst file has already been deleted?) in memory, which will cause memory keep growing problem. was (Author: zhoujira86): [~yunta] Master I logged onto the container, do a list and find out: /tmp/flink-io-b7483fcd-50aa-45d2-bd8e-a2b0862c6323/job__op_LegacyKeyedCoProcessOperator_a3bf5557de0062839a60e12819947e17__12_50__uuid_b91b9766-b8af-4de3-9a01-b5dcce29a7bf/db# ll | grep sst -rw-r--r-- 1 flink flink 30662931 Feb 27 18:20 002512.sst -rw-r--r-- 1 flink flink 1788364 Feb 27 18:30 002513.sst -rw-r--r-- 1 flink flink 3209306 Feb 27 18:40 002515.sst -rw-r--r-- 1 flink flink 13443570 Feb 27 18:40 002517.sst -rw-r--r-- 1 flink flink 1694438 Feb 27 18:50 002518.sst -rw-r--r-- 1 flink flink 1509487 Feb 27 18:50 002519.sst I think the LSM tree merge process will delete L0 files. And as mentioned above [https://github.com/facebook/rocksdb/issues/4112] . some one mentioned this. this is not exactly a leak but a lot of memory is allocated but not released const int table_cache_size = (mutable_db_options_.max_open_files == -1) ? TableCache::kInfiniteCapacity : mutable_db_options_.max_open_files - 10; table_cache_ = NewLRUCache(table_cache_size, immutable_db_options_.table_cache_numshardbits); all allocated records are stored in this cache mutable_db_options_.max_open_files is equal 1 so table_cache_size= 4 mb seems this mutable_db_options_.max_open_files = -1 configuration will save TableReader in memory, which will cause memory keep growing problem. > rocksdb max open file can lead to oom > -- > > Key: FLINK-31225 > URL: https://issues.apache.org/jira/browse/FLINK-31225 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > Attachments: image-2023-02-26-12-08-49-717.png, leak_test(2).png > > > the default value for > state.backend.rocksdb.files.open > is -1 > > [https://github.com/facebook/rocksdb/issues/4112] this issue told us the > rocksdb will not close fd , so this can lead to oom issue. > > also I can reproduce the situation in my enviroment. left part(2/21- 2/24) is > leave max open file to -1, right part(2/24 till now) is set the max open file > to 300. the memory grow is very differnt. > !image-2023-02-26-12-08-49-717.png|width=616,height=285! > > I have also attached a jeprof for 2/21-2/24 instance, the tm memory size is > about 8GB, heap memory is about 2.6GB, the native part in leak_test is about > 1GB. I think the remaining part (8-2.6-1)is occupied by fd . > > I suggest set this to a default value like 500. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31225) rocksdb max open file can lead to oom
[ https://issues.apache.org/jira/browse/FLINK-31225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17693953#comment-17693953 ] xiaogang zhou commented on FLINK-31225: --- [~yunta] Master I logged onto the container, do a list and find out: /tmp/flink-io-b7483fcd-50aa-45d2-bd8e-a2b0862c6323/job__op_LegacyKeyedCoProcessOperator_a3bf5557de0062839a60e12819947e17__12_50__uuid_b91b9766-b8af-4de3-9a01-b5dcce29a7bf/db# ll | grep sst -rw-r--r-- 1 flink flink 30662931 Feb 27 18:20 002512.sst -rw-r--r-- 1 flink flink 1788364 Feb 27 18:30 002513.sst -rw-r--r-- 1 flink flink 3209306 Feb 27 18:40 002515.sst -rw-r--r-- 1 flink flink 13443570 Feb 27 18:40 002517.sst -rw-r--r-- 1 flink flink 1694438 Feb 27 18:50 002518.sst -rw-r--r-- 1 flink flink 1509487 Feb 27 18:50 002519.sst I think the LSM tree merge process will delete L0 files. And as mentioned above [https://github.com/facebook/rocksdb/issues/4112] . some one mentioned this. this is not exactly a leak but a lot of memory is allocated but not released const int table_cache_size = (mutable_db_options_.max_open_files == -1) ? TableCache::kInfiniteCapacity : mutable_db_options_.max_open_files - 10; table_cache_ = NewLRUCache(table_cache_size, immutable_db_options_.table_cache_numshardbits); all allocated records are stored in this cache mutable_db_options_.max_open_files is equal 1 so table_cache_size= 4 mb seems this mutable_db_options_.max_open_files = -1 configuration will save TableReader in memory, which will cause memory keep growing problem. > rocksdb max open file can lead to oom > -- > > Key: FLINK-31225 > URL: https://issues.apache.org/jira/browse/FLINK-31225 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > Attachments: image-2023-02-26-12-08-49-717.png, leak_test(2).png > > > the default value for > state.backend.rocksdb.files.open > is -1 > > [https://github.com/facebook/rocksdb/issues/4112] this issue told us the > rocksdb will not close fd , so this can lead to oom issue. > > also I can reproduce the situation in my enviroment. left part(2/21- 2/24) is > leave max open file to -1, right part(2/24 till now) is set the max open file > to 300. the memory grow is very differnt. > !image-2023-02-26-12-08-49-717.png|width=616,height=285! > > I have also attached a jeprof for 2/21-2/24 instance, the tm memory size is > about 8GB, heap memory is about 2.6GB, the native part in leak_test is about > 1GB. I think the remaining part (8-2.6-1)is occupied by fd . > > I suggest set this to a default value like 500. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31225) rocksdb max open file can lead to oom
[ https://issues.apache.org/jira/browse/FLINK-31225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-31225: -- Description: the default value for state.backend.rocksdb.files.open is -1 [https://github.com/facebook/rocksdb/issues/4112] this issue told us the rocksdb will not close fd , so this can lead to oom issue. also I can reproduce the situation in my enviroment. left part(2/21- 2/24) is leave max open file to -1, right part(2/24 till now) is set the max open file to 300. the memory grow is very differnt. !image-2023-02-26-12-08-49-717.png|width=616,height=285! I have also attached a jeprof for 2/21-2/24 instance, the tm memory size is about 8GB, heap memory is about 2.6GB, the native part in leak_test is about 1GB. I think the remaining part (8-2.6-1)is occupied by fd . I suggest set this to a default value like 500. was: the default value for state.backend.rocksdb.files.open is -1 [https://github.com/facebook/rocksdb/issues/4112] this issue told us the rocksdb will not close fd , some this can lead to oom issue. also I can reproduce the situation in my enviroment. left part(2/21- 2/24) is leave max open file to -1, right part(2/24 till now) is set the max open file to 300. the memory grow is very differnt. !image-2023-02-26-12-08-49-717.png|width=616,height=285! I have also attached a jeprof for 2/21-2/24 instance, the tm memory size is about 8GB, heap memory is about 2.6GB, the native part in leak_test is about 1GB. I think the remaining part (8-2.6-1)is occupied by fd . I suggest set this to a default value like 500. > rocksdb max open file can lead to oom > -- > > Key: FLINK-31225 > URL: https://issues.apache.org/jira/browse/FLINK-31225 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > Attachments: image-2023-02-26-12-08-49-717.png, leak_test(2).png > > > the default value for > state.backend.rocksdb.files.open > is -1 > > [https://github.com/facebook/rocksdb/issues/4112] this issue told us the > rocksdb will not close fd , so this can lead to oom issue. > > also I can reproduce the situation in my enviroment. left part(2/21- 2/24) is > leave max open file to -1, right part(2/24 till now) is set the max open file > to 300. the memory grow is very differnt. > !image-2023-02-26-12-08-49-717.png|width=616,height=285! > > I have also attached a jeprof for 2/21-2/24 instance, the tm memory size is > about 8GB, heap memory is about 2.6GB, the native part in leak_test is about > 1GB. I think the remaining part (8-2.6-1)is occupied by fd . > > I suggest set this to a default value like 500. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31225) rocksdb max open file can lead to oom
[ https://issues.apache.org/jira/browse/FLINK-31225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-31225: -- Description: the default value for state.backend.rocksdb.files.open is -1 [https://github.com/facebook/rocksdb/issues/4112] this issue told us the rocksdb will not close fd , some this can lead to oom issue. also I can reproduce the situation in my enviroment. left part(2/21- 2/24) is leave max open file to -1, right part(2/24 till now) is set the max open file to 300. the memory grow is very differnt. !image-2023-02-26-12-08-49-717.png|width=616,height=285! I have also attached a jeprof for 2/21-2/24 instance, the tm memory size is about 8GB, heap memory is about 2.6GB, the native part in leak_test is about 1GB. I think the remaining part (8-2.6-1)is occupied by fd . I suggest set this to a default value like 500. was: the default value for state.backend.rocksdb.files.open is -1 [https://github.com/facebook/rocksdb/issues/4112] this issue told us the rocksdb will not close fd , some this can lead to oom issue. also I can reproduce the situation in my enviroment. left part(2/21- 2/24) is leave max open file to -1, right part(2/24 till now) is set the max open file to 300. the memory grow is very differnt. !image-2023-02-26-12-08-49-717.png|width=616,height=285! I suggest set this to a default value like 500. > rocksdb max open file can lead to oom > -- > > Key: FLINK-31225 > URL: https://issues.apache.org/jira/browse/FLINK-31225 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > Attachments: image-2023-02-26-12-08-49-717.png, leak_test(2).png > > > the default value for > state.backend.rocksdb.files.open > is -1 > > [https://github.com/facebook/rocksdb/issues/4112] this issue told us the > rocksdb will not close fd , some this can lead to oom issue. > > also I can reproduce the situation in my enviroment. left part(2/21- 2/24) is > leave max open file to -1, right part(2/24 till now) is set the max open file > to 300. the memory grow is very differnt. > !image-2023-02-26-12-08-49-717.png|width=616,height=285! > > I have also attached a jeprof for 2/21-2/24 instance, the tm memory size is > about 8GB, heap memory is about 2.6GB, the native part in leak_test is about > 1GB. I think the remaining part (8-2.6-1)is occupied by fd . > > I suggest set this to a default value like 500. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31225) rocksdb max open file can lead to oom
[ https://issues.apache.org/jira/browse/FLINK-31225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-31225: -- Attachment: leak_test(2).png > rocksdb max open file can lead to oom > -- > > Key: FLINK-31225 > URL: https://issues.apache.org/jira/browse/FLINK-31225 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > Attachments: image-2023-02-26-12-08-49-717.png, leak_test(2).png > > > the default value for > state.backend.rocksdb.files.open > is -1 > > [https://github.com/facebook/rocksdb/issues/4112] this issue told us the > rocksdb will not close fd , some this can lead to oom issue. > > also I can reproduce the situation in my enviroment. left part(2/21- 2/24) is > leave max open file to -1, right part(2/24 till now) is set the max open file > to 300. the memory grow is very differnt. > !image-2023-02-26-12-08-49-717.png|width=616,height=285! > > I suggest set this to a default value like 500. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31225) rocksdb max open file can lead to oom
[ https://issues.apache.org/jira/browse/FLINK-31225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-31225: -- Description: the default value for state.backend.rocksdb.files.open is -1 [https://github.com/facebook/rocksdb/issues/4112] this issue told us the rocksdb will not close fd , some this can lead to oom issue. also I can reproduce the situation in my enviroment. left part(2/21- 2/24) is leave max open file to -1, right part(2/24 till now) is set the max open file to 300. the memory grow is very differnt. !image-2023-02-26-12-08-49-717.png|width=616,height=285! I suggest set this to a default value like 500. > rocksdb max open file can lead to oom > -- > > Key: FLINK-31225 > URL: https://issues.apache.org/jira/browse/FLINK-31225 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > Attachments: image-2023-02-26-12-08-49-717.png > > > the default value for > state.backend.rocksdb.files.open > is -1 > > [https://github.com/facebook/rocksdb/issues/4112] this issue told us the > rocksdb will not close fd , some this can lead to oom issue. > > also I can reproduce the situation in my enviroment. left part(2/21- 2/24) is > leave max open file to -1, right part(2/24 till now) is set the max open file > to 300. the memory grow is very differnt. > !image-2023-02-26-12-08-49-717.png|width=616,height=285! > > I suggest set this to a default value like 500. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31225) rocksdb max open file can lead to oom
[ https://issues.apache.org/jira/browse/FLINK-31225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaogang zhou updated FLINK-31225: -- Attachment: image-2023-02-26-12-08-49-717.png > rocksdb max open file can lead to oom > -- > > Key: FLINK-31225 > URL: https://issues.apache.org/jira/browse/FLINK-31225 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > Attachments: image-2023-02-26-12-08-49-717.png > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-31089) pin L0 index in memory can lead to slow memory grow finally lead to memory beyond limit
[ https://issues.apache.org/jira/browse/FLINK-31089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17693581#comment-17693581 ] xiaogang zhou edited comment on FLINK-31089 at 2/26/23 4:05 AM: [~Yanfei Lei] yes, your summary is pretty accurate. except pin l0 can improve the performance, but disable it will not influence too much. But this is not the main topic. My job is a datastream job, my point is to prompt some warning as developer may forget to set the stateTtlConfig whereas they turn on the PinTopLevelIndexAndFilter. this can 100% lead to some oom issue. yet I have a new issue can also lead to oom https://issues.apache.org/jira/browse/FLINK-31225 was (Author: zhoujira86): [~Yanfei Lei] yes, your summary is pretty accurate. except pin l0 can improve the performance, but disable it will not influence too much. But this is not the main topic. My job is a datastream job, my point is to prompt some warning as developer may forget to set the stateTtlConfig whereas they turn on the PinTopLevelIndexAndFilter. this can 100% lead to some oom issue. > pin L0 index in memory can lead to slow memory grow finally lead to memory > beyond limit > --- > > Key: FLINK-31089 > URL: https://issues.apache.org/jira/browse/FLINK-31089 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > Attachments: image-2023-02-15-20-26-58-604.png, > image-2023-02-15-20-32-17-993.png, image-2023-02-17-16-48-59-535.png, > l0pin_open.png > > > with the setPinL0FilterAndIndexBlocksInCache true, we can see the pinned > memory kept growing(in the pc blow from 48G-> 50G in about 5 hours). But if > we switch it to false, we can see the pinned memory stay realtive static. In > our environment, a lot of tasks restart due to memory over limit killed by k8s > !image-2023-02-15-20-26-58-604.png|width=899,height=447! > > !image-2023-02-15-20-32-17-993.png|width=853,height=464! > the two graphs are recorded in yesterday and today, which means the data > stream number per second will not differ alot. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31225) rocksdb max open file can lead to oom
xiaogang zhou created FLINK-31225: - Summary: rocksdb max open file can lead to oom Key: FLINK-31225 URL: https://issues.apache.org/jira/browse/FLINK-31225 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Affects Versions: 1.16.1 Reporter: xiaogang zhou -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31089) pin L0 index in memory can lead to slow memory grow finally lead to memory beyond limit
[ https://issues.apache.org/jira/browse/FLINK-31089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17693581#comment-17693581 ] xiaogang zhou commented on FLINK-31089: --- [~Yanfei Lei] yes, your summary is pretty accurate. except pin l0 can improve the performance, but disable it will not influence too much. But this is not the main topic. My job is a datastream job, my point is to prompt some warning as developer may forget to set the stateTtlConfig whereas they turn on the PinTopLevelIndexAndFilter. this can 100% lead to some oom issue. > pin L0 index in memory can lead to slow memory grow finally lead to memory > beyond limit > --- > > Key: FLINK-31089 > URL: https://issues.apache.org/jira/browse/FLINK-31089 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > Attachments: image-2023-02-15-20-26-58-604.png, > image-2023-02-15-20-32-17-993.png, image-2023-02-17-16-48-59-535.png, > l0pin_open.png > > > with the setPinL0FilterAndIndexBlocksInCache true, we can see the pinned > memory kept growing(in the pc blow from 48G-> 50G in about 5 hours). But if > we switch it to false, we can see the pinned memory stay realtive static. In > our environment, a lot of tasks restart due to memory over limit killed by k8s > !image-2023-02-15-20-26-58-604.png|width=899,height=447! > > !image-2023-02-15-20-32-17-993.png|width=853,height=464! > the two graphs are recorded in yesterday and today, which means the data > stream number per second will not differ alot. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31089) pin L0 index in memory can lead to slow memory grow finally lead to memory beyond limit
[ https://issues.apache.org/jira/browse/FLINK-31089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17692566#comment-17692566 ] xiaogang zhou commented on FLINK-31089: --- some more info: 1, the task with ttl on has been running for long without pinned block cache grow 2,we have many task running with 1.13, which means they are without the fix https://issues.apache.org/jira/browse/FLINK-22957 these task also with the partitioned-index-filters on. They also has oom occasionally > pin L0 index in memory can lead to slow memory grow finally lead to memory > beyond limit > --- > > Key: FLINK-31089 > URL: https://issues.apache.org/jira/browse/FLINK-31089 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > Attachments: image-2023-02-15-20-26-58-604.png, > image-2023-02-15-20-32-17-993.png, image-2023-02-17-16-48-59-535.png, > l0pin_open.png > > > with the setPinL0FilterAndIndexBlocksInCache true, we can see the pinned > memory kept growing(in the pc blow from 48G-> 50G in about 5 hours). But if > we switch it to false, we can see the pinned memory stay realtive static. In > our environment, a lot of tasks restart due to memory over limit killed by k8s > !image-2023-02-15-20-26-58-604.png|width=899,height=447! > > !image-2023-02-15-20-32-17-993.png|width=853,height=464! > the two graphs are recorded in yesterday and today, which means the data > stream number per second will not differ alot. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31089) pin L0 index in memory can lead to slow memory grow finally lead to memory beyond limit
[ https://issues.apache.org/jira/browse/FLINK-31089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17692546#comment-17692546 ] xiaogang zhou commented on FLINK-31089: --- [~yunta] if I turn off the PinTopLevelIndexAndFilter, the task can not run correctly as it takes a lot of time load cache. I also found some rank operator does not has compaction filter in LOG file > pin L0 index in memory can lead to slow memory grow finally lead to memory > beyond limit > --- > > Key: FLINK-31089 > URL: https://issues.apache.org/jira/browse/FLINK-31089 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > Attachments: image-2023-02-15-20-26-58-604.png, > image-2023-02-15-20-32-17-993.png, image-2023-02-17-16-48-59-535.png, > l0pin_open.png > > > with the setPinL0FilterAndIndexBlocksInCache true, we can see the pinned > memory kept growing(in the pc blow from 48G-> 50G in about 5 hours). But if > we switch it to false, we can see the pinned memory stay realtive static. In > our environment, a lot of tasks restart due to memory over limit killed by k8s > !image-2023-02-15-20-26-58-604.png|width=899,height=447! > > !image-2023-02-15-20-32-17-993.png|width=853,height=464! > the two graphs are recorded in yesterday and today, which means the data > stream number per second will not differ alot. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-31089) pin L0 index in memory can lead to slow memory grow finally lead to memory beyond limit
[ https://issues.apache.org/jira/browse/FLINK-31089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690346#comment-17690346 ] xiaogang zhou edited comment on FLINK-31089 at 2/21/23 2:38 AM: yes, this is not large when start up, but it keeps growing, so no matter how large the tm memory is, it will finally oom. now I started up another task with setPinL0FilterAndIndexBlocksInCache true, which will have faster growing speed. I will collect another visual profile at weekend, will post it here. was (Author: zhoujira86): yes, this is not large when start up, but it keeps growing, so no matter how large the tm memory is, it will finally oom. now I started up another task with setPinL0FilterAndIndexBlocksInCache true, which will have faster growing speed. I will collect another visual profile at weekend, will post it here. And I think it will be convenient to communicate via dingding, I am in a ecommerce company in Shanghai in charge of flink. you can send dingding to my mail [zhou16...@163.com. Maybe I can get some answer from your side :)|mailto:zhou16...@163.com%E3%80%82] > pin L0 index in memory can lead to slow memory grow finally lead to memory > beyond limit > --- > > Key: FLINK-31089 > URL: https://issues.apache.org/jira/browse/FLINK-31089 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > Attachments: image-2023-02-15-20-26-58-604.png, > image-2023-02-15-20-32-17-993.png, image-2023-02-17-16-48-59-535.png, > l0pin_open.png > > > with the setPinL0FilterAndIndexBlocksInCache true, we can see the pinned > memory kept growing(in the pc blow from 48G-> 50G in about 5 hours). But if > we switch it to false, we can see the pinned memory stay realtive static. In > our environment, a lot of tasks restart due to memory over limit killed by k8s > !image-2023-02-15-20-26-58-604.png|width=899,height=447! > > !image-2023-02-15-20-32-17-993.png|width=853,height=464! > the two graphs are recorded in yesterday and today, which means the data > stream number per second will not differ alot. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31089) pin L0 index in memory can lead to slow memory grow finally lead to memory beyond limit
[ https://issues.apache.org/jira/browse/FLINK-31089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17691338#comment-17691338 ] xiaogang zhou commented on FLINK-31089: --- [~yunta] Master, After turn on compaction filter, the pinned block cache size stop growing. Sould we add some warning for situation if 'partitioned-index-filters' is on and no ttl configured? > pin L0 index in memory can lead to slow memory grow finally lead to memory > beyond limit > --- > > Key: FLINK-31089 > URL: https://issues.apache.org/jira/browse/FLINK-31089 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > Attachments: image-2023-02-15-20-26-58-604.png, > image-2023-02-15-20-32-17-993.png, image-2023-02-17-16-48-59-535.png, > l0pin_open.png > > > with the setPinL0FilterAndIndexBlocksInCache true, we can see the pinned > memory kept growing(in the pc blow from 48G-> 50G in about 5 hours). But if > we switch it to false, we can see the pinned memory stay realtive static. In > our environment, a lot of tasks restart due to memory over limit killed by k8s > !image-2023-02-15-20-26-58-604.png|width=899,height=447! > > !image-2023-02-15-20-32-17-993.png|width=853,height=464! > the two graphs are recorded in yesterday and today, which means the data > stream number per second will not differ alot. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31089) pin L0 index in memory can lead to slow memory grow finally lead to memory beyond limit
[ https://issues.apache.org/jira/browse/FLINK-31089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17691153#comment-17691153 ] xiaogang zhou commented on FLINK-31089: --- I create another task with ttl open, And will keep monitor the memory growth > pin L0 index in memory can lead to slow memory grow finally lead to memory > beyond limit > --- > > Key: FLINK-31089 > URL: https://issues.apache.org/jira/browse/FLINK-31089 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > Attachments: image-2023-02-15-20-26-58-604.png, > image-2023-02-15-20-32-17-993.png, image-2023-02-17-16-48-59-535.png, > l0pin_open.png > > > with the setPinL0FilterAndIndexBlocksInCache true, we can see the pinned > memory kept growing(in the pc blow from 48G-> 50G in about 5 hours). But if > we switch it to false, we can see the pinned memory stay realtive static. In > our environment, a lot of tasks restart due to memory over limit killed by k8s > !image-2023-02-15-20-26-58-604.png|width=899,height=447! > > !image-2023-02-15-20-32-17-993.png|width=853,height=464! > the two graphs are recorded in yesterday and today, which means the data > stream number per second will not differ alot. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-31089) pin L0 index in memory can lead to slow memory grow finally lead to memory beyond limit
[ https://issues.apache.org/jira/browse/FLINK-31089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17691114#comment-17691114 ] xiaogang zhou edited comment on FLINK-31089 at 2/20/23 10:10 AM: - [~yunta] I found in rocksdb log, 2023/02/20-17:55:33.357582 7f4092f42700 Options.compaction_filter: None 2023/02/20-17:55:33.357583 7f4092f42700 Options.compaction_filter_factory: None could this lead to the index 'oom' issue? was (Author: zhoujira86): I found in rocksdb log, 2023/02/20-17:55:33.357582 7f4092f42700 Options.compaction_filter: None 2023/02/20-17:55:33.357583 7f4092f42700 Options.compaction_filter_factory: None could this lead to the index 'oom' issue? > pin L0 index in memory can lead to slow memory grow finally lead to memory > beyond limit > --- > > Key: FLINK-31089 > URL: https://issues.apache.org/jira/browse/FLINK-31089 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > Attachments: image-2023-02-15-20-26-58-604.png, > image-2023-02-15-20-32-17-993.png, image-2023-02-17-16-48-59-535.png, > l0pin_open.png > > > with the setPinL0FilterAndIndexBlocksInCache true, we can see the pinned > memory kept growing(in the pc blow from 48G-> 50G in about 5 hours). But if > we switch it to false, we can see the pinned memory stay realtive static. In > our environment, a lot of tasks restart due to memory over limit killed by k8s > !image-2023-02-15-20-26-58-604.png|width=899,height=447! > > !image-2023-02-15-20-32-17-993.png|width=853,height=464! > the two graphs are recorded in yesterday and today, which means the data > stream number per second will not differ alot. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31089) pin L0 index in memory can lead to slow memory grow finally lead to memory beyond limit
[ https://issues.apache.org/jira/browse/FLINK-31089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17691114#comment-17691114 ] xiaogang zhou commented on FLINK-31089: --- I found in rocksdb log, 2023/02/20-17:55:33.357582 7f4092f42700 Options.compaction_filter: None 2023/02/20-17:55:33.357583 7f4092f42700 Options.compaction_filter_factory: None could this lead to the index 'oom' issue? > pin L0 index in memory can lead to slow memory grow finally lead to memory > beyond limit > --- > > Key: FLINK-31089 > URL: https://issues.apache.org/jira/browse/FLINK-31089 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > Attachments: image-2023-02-15-20-26-58-604.png, > image-2023-02-15-20-32-17-993.png, image-2023-02-17-16-48-59-535.png, > l0pin_open.png > > > with the setPinL0FilterAndIndexBlocksInCache true, we can see the pinned > memory kept growing(in the pc blow from 48G-> 50G in about 5 hours). But if > we switch it to false, we can see the pinned memory stay realtive static. In > our environment, a lot of tasks restart due to memory over limit killed by k8s > !image-2023-02-15-20-26-58-604.png|width=899,height=447! > > !image-2023-02-15-20-32-17-993.png|width=853,height=464! > the two graphs are recorded in yesterday and today, which means the data > stream number per second will not differ alot. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-31089) pin L0 index in memory can lead to slow memory grow finally lead to memory beyond limit
[ https://issues.apache.org/jira/browse/FLINK-31089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690999#comment-17690999 ] xiaogang zhou edited comment on FLINK-31089 at 2/20/23 4:56 AM: [~yunta] got some update with l0 pin open, see attache l0pin_open. was (Author: zhoujira86): [~yunta] got some update with l0 pin open, see attache l0pin_open. I configured the table.exec.state.ttl to 36hrs. I suspect whether it does not change the rocksdb default ttl configuration? > pin L0 index in memory can lead to slow memory grow finally lead to memory > beyond limit > --- > > Key: FLINK-31089 > URL: https://issues.apache.org/jira/browse/FLINK-31089 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > Attachments: image-2023-02-15-20-26-58-604.png, > image-2023-02-15-20-32-17-993.png, image-2023-02-17-16-48-59-535.png, > l0pin_open.png > > > with the setPinL0FilterAndIndexBlocksInCache true, we can see the pinned > memory kept growing(in the pc blow from 48G-> 50G in about 5 hours). But if > we switch it to false, we can see the pinned memory stay realtive static. In > our environment, a lot of tasks restart due to memory over limit killed by k8s > !image-2023-02-15-20-26-58-604.png|width=899,height=447! > > !image-2023-02-15-20-32-17-993.png|width=853,height=464! > the two graphs are recorded in yesterday and today, which means the data > stream number per second will not differ alot. -- This message was sent by Atlassian Jira (v8.20.10#820010)