[jira] [Commented] (FLINK-33971) Specifies whether to use HBase table that supports dynamic columns.
[ https://issues.apache.org/jira/browse/FLINK-33971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17802045#comment-17802045 ] MOBIN commented on FLINK-33971: --- https://github.com/apache/flink-connector-hbase/pull/36 > Specifies whether to use HBase table that supports dynamic columns. > --- > > Key: FLINK-33971 > URL: https://issues.apache.org/jira/browse/FLINK-33971 > Project: Flink > Issue Type: New Feature > Components: Connectors / HBase >Reporter: MOBIN >Priority: Minor > > Specifies whether to use HBase table that supports dynamic columns. > Refer to the dynamic.table parameter in this document: > [[https://www.alibabacloud.com/help/en/flink/developer-reference/apsaradb-for-hbase-connector#section-ltp-3fy-9qv|http://example.com]|https://www.alibabacloud.com/help/en/flink/developer-reference/apsaradb-for-hbase-connector#section-ltp-3fy-9qv] > Sample code for a result table that supports dynamic columns > CREATE TEMPORARY TABLE datagen_source ( > id INT, > f1hour STRING, > f1deal BIGINT, > f2day STRING, > f2deal BIGINT > ) WITH ( > 'connector'='datagen' > ); > CREATE TEMPORARY TABLE hbase_sink ( > rowkey INT, > f1 ROW<`hour` STRING, deal BIGINT>, > f2 ROW<`day` STRING, deal BIGINT> > ) WITH ( > 'connector'='hbase-2.2', > 'table-name'='', > 'zookeeper.quorum'='', > 'dynamic.table'='true' > ); > INSERT INTO hbase_sink > SELECT id, ROW(f1hour, f1deal), ROW(f2day, f2deal) FROM datagen_source; > If dynamic.table is set to true, HBase table that supports dynamic columns is > used. > Two fields must be declared in the rows that correspond to each column > family. The value of the first field indicates the dynamic column, and the > value of the second field indicates the value of the dynamic column. > For example, the datagen_source table contains a row of data The row of data > indicates that the ID of the commodity is 1, the transaction amount of the > commodity between 10:00 and 11:00 is 100, and the transaction amount of the > commodity on July 26, 2020 is 1. In this case, a row whose rowkey is 1 is > inserted into the ApsaraDB for HBase table. f1:10 is 100, and f2:2020-7-26 is > 1. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (FLINK-33971) Specifies whether to use HBase table that supports dynamic columns.
[ https://issues.apache.org/jira/browse/FLINK-33971 ] MOBIN deleted comment on FLINK-33971: --- was (Author: mobin): https://github.com/apache/flink-connector-hbase/pull/36 > Specifies whether to use HBase table that supports dynamic columns. > --- > > Key: FLINK-33971 > URL: https://issues.apache.org/jira/browse/FLINK-33971 > Project: Flink > Issue Type: New Feature > Components: Connectors / HBase >Reporter: MOBIN >Priority: Minor > > Specifies whether to use HBase table that supports dynamic columns. > Refer to the dynamic.table parameter in this document: > [[https://www.alibabacloud.com/help/en/flink/developer-reference/apsaradb-for-hbase-connector#section-ltp-3fy-9qv|http://example.com]|https://www.alibabacloud.com/help/en/flink/developer-reference/apsaradb-for-hbase-connector#section-ltp-3fy-9qv] > Sample code for a result table that supports dynamic columns > CREATE TEMPORARY TABLE datagen_source ( > id INT, > f1hour STRING, > f1deal BIGINT, > f2day STRING, > f2deal BIGINT > ) WITH ( > 'connector'='datagen' > ); > CREATE TEMPORARY TABLE hbase_sink ( > rowkey INT, > f1 ROW<`hour` STRING, deal BIGINT>, > f2 ROW<`day` STRING, deal BIGINT> > ) WITH ( > 'connector'='hbase-2.2', > 'table-name'='', > 'zookeeper.quorum'='', > 'dynamic.table'='true' > ); > INSERT INTO hbase_sink > SELECT id, ROW(f1hour, f1deal), ROW(f2day, f2deal) FROM datagen_source; > If dynamic.table is set to true, HBase table that supports dynamic columns is > used. > Two fields must be declared in the rows that correspond to each column > family. The value of the first field indicates the dynamic column, and the > value of the second field indicates the value of the dynamic column. > For example, the datagen_source table contains a row of data The row of data > indicates that the ID of the commodity is 1, the transaction amount of the > commodity between 10:00 and 11:00 is 100, and the transaction amount of the > commodity on July 26, 2020 is 1. In this case, a row whose rowkey is 1 is > inserted into the ApsaraDB for HBase table. f1:10 is 100, and f2:2020-7-26 is > 1. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33971) Specifies whether to use HBase table that supports dynamic columns.
MOBIN created FLINK-33971: - Summary: Specifies whether to use HBase table that supports dynamic columns. Key: FLINK-33971 URL: https://issues.apache.org/jira/browse/FLINK-33971 Project: Flink Issue Type: New Feature Components: Connectors / HBase Reporter: MOBIN Specifies whether to use HBase table that supports dynamic columns. Refer to the dynamic.table parameter in this document: [[https://www.alibabacloud.com/help/en/flink/developer-reference/apsaradb-for-hbase-connector#section-ltp-3fy-9qv|http://example.com]|https://www.alibabacloud.com/help/en/flink/developer-reference/apsaradb-for-hbase-connector#section-ltp-3fy-9qv] Sample code for a result table that supports dynamic columns CREATE TEMPORARY TABLE datagen_source ( id INT, f1hour STRING, f1deal BIGINT, f2day STRING, f2deal BIGINT ) WITH ( 'connector'='datagen' ); CREATE TEMPORARY TABLE hbase_sink ( rowkey INT, f1 ROW<`hour` STRING, deal BIGINT>, f2 ROW<`day` STRING, deal BIGINT> ) WITH ( 'connector'='hbase-2.2', 'table-name'='', 'zookeeper.quorum'='', 'dynamic.table'='true' ); INSERT INTO hbase_sink SELECT id, ROW(f1hour, f1deal), ROW(f2day, f2deal) FROM datagen_source; If dynamic.table is set to true, HBase table that supports dynamic columns is used. Two fields must be declared in the rows that correspond to each column family. The value of the first field indicates the dynamic column, and the value of the second field indicates the value of the dynamic column. For example, the datagen_source table contains a row of data The row of data indicates that the ID of the commodity is 1, the transaction amount of the commodity between 10:00 and 11:00 is 100, and the transaction amount of the commodity on July 26, 2020 is 1. In this case, a row whose rowkey is 1 is inserted into the ApsaraDB for HBase table. f1:10 is 100, and f2:2020-7-26 is 1. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-26694) LookupFunction doesn't support multilevel inheritance
[ https://issues.apache.org/jira/browse/FLINK-26694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu resolved FLINK-26694. Resolution: Fixed Fixed in master(1.19): 6e78eb18524ead3abd60da0ca41751b45e0e2482 > LookupFunction doesn't support multilevel inheritance > - > > Key: FLINK-26694 > URL: https://issues.apache.org/jira/browse/FLINK-26694 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.14.4 >Reporter: tsianglei >Assignee: Xianxun Ye >Priority: Minor > Labels: auto-deprioritized-major, pull-request-available > Fix For: 1.19.0 > > Attachments: image-2022-03-17-11-19-21-269.png > > > When we implement lookup function with multilevel inheritance,then there has > an error。 > {code:java} > org.apache.flink.table.api.ValidationException: Could not determine a type > inference for lookup function 'default_catalog.default_database.dimTable'. > Lookup functions support regular type inference. However, for convenience, > the output class can simply be a Row or RowData class in which case the input > and output types are derived from the table's schema with default conversion. > at > org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator$.createLookupTypeInference(LookupJoinCodeGenerator.scala:270) > at > org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator$.generateLookupFunction(LookupJoinCodeGenerator.scala:166) > at > org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator$.generateAsyncLookupFunction(LookupJoinCodeGenerator.scala:118) > at > org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator.generateAsyncLookupFunction(LookupJoinCodeGenerator.scala) > at > org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.createAsyncLookupJoin(CommonExecLookupJoin.java:332) > at > org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.translateToPlanInternal(CommonExecLookupJoin.java:238) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250) > at > org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:88) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250) > at > org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:114) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:71) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:70) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70) > at > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:107) > at > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:47) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:705) > at > org.apache.flink.table.api.internal.StatementSetImpl.explain(StatementSetImpl.java:118) > XXX > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:4
[jira] [Assigned] (FLINK-26694) LookupFunction doesn't support multilevel inheritance
[ https://issues.apache.org/jira/browse/FLINK-26694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu reassigned FLINK-26694: -- Assignee: Xianxun Ye > LookupFunction doesn't support multilevel inheritance > - > > Key: FLINK-26694 > URL: https://issues.apache.org/jira/browse/FLINK-26694 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.14.4 >Reporter: tsianglei >Assignee: Xianxun Ye >Priority: Minor > Labels: auto-deprioritized-major, pull-request-available > Attachments: image-2022-03-17-11-19-21-269.png > > > When we implement lookup function with multilevel inheritance,then there has > an error。 > {code:java} > org.apache.flink.table.api.ValidationException: Could not determine a type > inference for lookup function 'default_catalog.default_database.dimTable'. > Lookup functions support regular type inference. However, for convenience, > the output class can simply be a Row or RowData class in which case the input > and output types are derived from the table's schema with default conversion. > at > org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator$.createLookupTypeInference(LookupJoinCodeGenerator.scala:270) > at > org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator$.generateLookupFunction(LookupJoinCodeGenerator.scala:166) > at > org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator$.generateAsyncLookupFunction(LookupJoinCodeGenerator.scala:118) > at > org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator.generateAsyncLookupFunction(LookupJoinCodeGenerator.scala) > at > org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.createAsyncLookupJoin(CommonExecLookupJoin.java:332) > at > org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.translateToPlanInternal(CommonExecLookupJoin.java:238) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250) > at > org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:88) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250) > at > org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:114) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:71) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:70) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70) > at > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:107) > at > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:47) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:705) > at > org.apache.flink.table.api.internal.StatementSetImpl.explain(StatementSetImpl.java:118) > XXX > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.j
[jira] [Updated] (FLINK-26694) LookupFunction doesn't support multilevel inheritance
[ https://issues.apache.org/jira/browse/FLINK-26694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu updated FLINK-26694: --- Fix Version/s: 1.19.0 > LookupFunction doesn't support multilevel inheritance > - > > Key: FLINK-26694 > URL: https://issues.apache.org/jira/browse/FLINK-26694 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.14.4 >Reporter: tsianglei >Assignee: Xianxun Ye >Priority: Minor > Labels: auto-deprioritized-major, pull-request-available > Fix For: 1.19.0 > > Attachments: image-2022-03-17-11-19-21-269.png > > > When we implement lookup function with multilevel inheritance,then there has > an error。 > {code:java} > org.apache.flink.table.api.ValidationException: Could not determine a type > inference for lookup function 'default_catalog.default_database.dimTable'. > Lookup functions support regular type inference. However, for convenience, > the output class can simply be a Row or RowData class in which case the input > and output types are derived from the table's schema with default conversion. > at > org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator$.createLookupTypeInference(LookupJoinCodeGenerator.scala:270) > at > org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator$.generateLookupFunction(LookupJoinCodeGenerator.scala:166) > at > org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator$.generateAsyncLookupFunction(LookupJoinCodeGenerator.scala:118) > at > org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator.generateAsyncLookupFunction(LookupJoinCodeGenerator.scala) > at > org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.createAsyncLookupJoin(CommonExecLookupJoin.java:332) > at > org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.translateToPlanInternal(CommonExecLookupJoin.java:238) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250) > at > org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:88) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250) > at > org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:114) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:71) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:70) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70) > at > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:107) > at > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:47) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:705) > at > org.apache.flink.table.api.internal.StatementSetImpl.explain(StatementSetImpl.java:118) > XXX > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMeth
Re: [PR] [FLINK-26694][table] Support lookup join via a multi-level inheritance of TableFunction [flink]
leonardBang merged PR #23684: URL: https://github.com/apache/flink/pull/23684 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20899][table-planner-blink] Fix ClassCastException when calculating cost in HepPlanner [flink]
leonardBang commented on PR #14588: URL: https://github.com/apache/flink/pull/14588#issuecomment-1874950162 Hey @godfreyhe would you like to rebase this PR to fix the conflicts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Specifies whether to use HBase table that supports dynamic columns. [flink-connector-hbase]
MOBIN-F opened a new pull request, #36: URL: https://github.com/apache/flink-connector-hbase/pull/36 Specifies whether to use an HBase table that supports dynamic columns. Refer to the dynamic.table parameter in this document: [https://www.alibabacloud.com/help/en/flink/developer-reference/apsaradb-for-hbase-connector#section-ltp-3fy-9qv](https://www.alibabacloud.com/help/en/flink/developer-reference/apsaradb-for-hbase-connector#section-ltp-3fy-9qv) Sample code for a result table that supports dynamic columns ``` CREATE TEMPORARY TABLE datagen_source ( id INT, f1hour STRING, f1deal BIGINT, f2day STRING, f2deal BIGINT ) WITH ( 'connector'='datagen' ); CREATE TEMPORARY TABLE hbase_sink ( rowkey INT, f1 ROW<`hour` STRING, deal BIGINT>, f2 ROW<`day` STRING, deal BIGINT> ) WITH ( 'connector'='hbase-2.2', 'table-name'='', 'zookeeper.quorum'='', 'dynamic.table'='true' ); INSERT INTO hbase_sink SELECT id, ROW(f1hour, f1deal), ROW(f2day, f2deal) FROM datagen_source; ``` If dynamic.table is set to true, HBase table that supports dynamic columns is used. Two fields must be declared in the rows that correspond to each column family. The value of the first field indicates the dynamic column, and the value of the second field indicates the value of the dynamic column. For example, the datagen_source table contains a row of data The row of data indicates that the ID of the commodity is 1, the transaction amount of the commodity between 10:00 and 11:00 is 100, and the transaction amount of the commodity on July 26, 2020 is 1. In this case, a row whose rowkey is 1 is inserted into the ApsaraDB for HBase table. f1:10 is 100, and f2:2020-7-26 is 1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Specifies whether to use HBase table that supports dynamic columns. [flink-connector-hbase]
boring-cyborg[bot] commented on PR #36: URL: https://github.com/apache/flink-connector-hbase/pull/36#issuecomment-1874935171 Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32958) Support VIEW as a source table in CREATE TABLE ... Like statement
[ https://issues.apache.org/jira/browse/FLINK-32958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17802032#comment-17802032 ] Martijn Visser commented on FLINK-32958: Thanks [~fornaix] ! > Support VIEW as a source table in CREATE TABLE ... Like statement > - > > Key: FLINK-32958 > URL: https://issues.apache.org/jira/browse/FLINK-32958 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.17.1 >Reporter: Han >Priority: Major > Labels: pull-request-available > > We can't create a table from a view through CREATE TABLE LIKE statement > > case 1: > {code:sql} > create view source_view as select id,val from source; > create table sink with ('connector' = 'print') like source_view (excluding > all); > insert into sink select * from source_view;{code} > case 2 > {code:java} > DataStreamSource source = ...; > tEnv.createTemporaryView("source", source); > tEnv.executeSql("create table sink with ('connector' = 'print') like source > (excluding all)"); > tEnv.executeSql("insert into sink select * from source");{code} > > The above cases will throw an exception: > {code:java} > Source table '`default_catalog`.`default_database`.`source`' of the LIKE > clause can not be a VIEW{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33964) Flink documentation can't be build due to error in Pulsar docs
[ https://issues.apache.org/jira/browse/FLINK-33964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17802031#comment-17802031 ] Martijn Visser commented on FLINK-33964: Thanks [~leonard] [~Tison]! > Flink documentation can't be build due to error in Pulsar docs > -- > > Key: FLINK-33964 > URL: https://issues.apache.org/jira/browse/FLINK-33964 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar, Documentation >Affects Versions: pulsar-4.1.0 >Reporter: Martijn Visser >Assignee: Leonard Xu >Priority: Blocker > Labels: pull-request-available > Fix For: pulsar-4.1.1 > > > https://github.com/apache/flink/actions/runs/7380766702/job/20078487743 > {code:java} > Start building sites … > hugo v0.110.0-e32a493b7826d02763c3b79623952e625402b168+extended linux/amd64 > BuildDate=2023-01-17T12:16:09Z VendorInfo=gohugoio > Error: Error building site: > "/root/flink/docs/themes/connectors/content.zh/docs/connectors/datastream/pulsar.md:491:1": > failed to extract shortcode: template for shortcode > "generated/pulsar_admin_configuration" not found > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-33970) Add necessary checks for connector document
[ https://issues.apache.org/jira/browse/FLINK-33970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu reassigned FLINK-33970: -- Assignee: Zhongqiang Gong > Add necessary checks for connector document > --- > > Key: FLINK-33970 > URL: https://issues.apache.org/jira/browse/FLINK-33970 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Leonard Xu >Assignee: Zhongqiang Gong >Priority: Major > > In FLINK-33964, we found the documentation files in independent connector > repos lacks basic checks like broken url, this ticket aims to add necessary > checks and avoid similar issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33970) Add necessary checks for connector document
[ https://issues.apache.org/jira/browse/FLINK-33970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17802025#comment-17802025 ] Zhongqiang Gong commented on FLINK-33970: - [~leonard] I am willing to take this issue, Please assign to me. Thank you~ > Add necessary checks for connector document > --- > > Key: FLINK-33970 > URL: https://issues.apache.org/jira/browse/FLINK-33970 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Leonard Xu >Priority: Major > > In FLINK-33964, we found the documentation files in independent connector > repos lacks basic checks like broken url, this ticket aims to add necessary > checks and avoid similar issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33970) Add necessary checks for connector document
[ https://issues.apache.org/jira/browse/FLINK-33970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu updated FLINK-33970: --- Description: In FLINK-33964, we found the documentation files in independent connector repos lacks basic checks like broken url, this ticket aims to add necessary checks and avoid similar issue. > Add necessary checks for connector document > --- > > Key: FLINK-33970 > URL: https://issues.apache.org/jira/browse/FLINK-33970 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Leonard Xu >Priority: Major > > In FLINK-33964, we found the documentation files in independent connector > repos lacks basic checks like broken url, this ticket aims to add necessary > checks and avoid similar issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33970) Add necessary checks for connector document
Leonard Xu created FLINK-33970: -- Summary: Add necessary checks for connector document Key: FLINK-33970 URL: https://issues.apache.org/jira/browse/FLINK-33970 Project: Flink Issue Type: Improvement Components: Documentation Reporter: Leonard Xu -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]
WencongLiu commented on code in PR #23905: URL: https://github.com/apache/flink/pull/23905#discussion_r1440107366 ## flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java: ## @@ -39,7 +41,16 @@ public interface SourceReaderContext { */ String getLocalHostName(); -/** @return The index of this subtask. */ +/** + * Get the index of this subtask. + * + * @deprecated This method is deprecated since Flink 1.19. All metadata about the task should be + * provided uniformly by {@link #getTaskInfo()}. + * @see https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs";> + * FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs + */ +@Deprecated int getIndexOfSubtask(); Review Comment: For these deprecated methods in the modified interfaces, I've changed them from abstract to default and provided default implementations. ## flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricher.java: ## @@ -84,14 +88,26 @@ enum FailureType { * Get the ID of the job. * * @return the ID of the job + * @deprecated This method is deprecated since Flink 1.19. All metadata about the job should + * be provided uniformly by {@link #getJobInfo()}. + * @see https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs";> + * FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs */ +@Deprecated Review Comment: For these deprecated methods in the modified interfaces, I've changed them from abstract to default and provided default implementations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]
WencongLiu commented on code in PR #23905: URL: https://github.com/apache/flink/pull/23905#discussion_r1440106348 ## flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java: ## @@ -63,14 +67,27 @@ public interface RuntimeContext { /** * The ID of the current job. Note that Job ID can change in particular upon manual restart. The * returned ID should NOT be used for any job management tasks. + * + * @deprecated This method is deprecated since Flink 1.19. All metadata about the job should be + * provided uniformly by {@link #getJobInfo()}. + * @see https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs";> + * FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs */ +@Deprecated JobID getJobId(); Review Comment: Good point! For these deprecated methods in the modified interfaces, I've changed them from abstract to default and provided default implementations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]
WencongLiu commented on code in PR #23905: URL: https://github.com/apache/flink/pull/23905#discussion_r1440105816 ## flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java: ## @@ -166,7 +169,10 @@ public class Task /** ID which identifies the slot in which the task is supposed to run. */ private final AllocationID allocationId; -/** TaskInfo object for this task. */ +/** The meta information of current job. */ +private final JobInfo jobInfo; Review Comment: These code changes are prerequisite for the FLIP-382. I've moved these code changes to the commit `[FLINK-33905][core] Add getJobInfo() method to Environment`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]
WencongLiu commented on code in PR #23905: URL: https://github.com/apache/flink/pull/23905#discussion_r1440107273 ## flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java: ## @@ -119,7 +120,14 @@ public interface Environment { Configuration getJobConfiguration(); /** - * Returns the {@link TaskInfo} object associated with this subtask + * Returns the {@link JobInfo} object associated with current job. + * + * @return JobInfo for current job + */ +JobInfo getJobInfo(); Review Comment: These code changes are prerequisite for the FLIP-382. I've moved these code changes to the commit `[FLINK-33905][core] Add getJobInfo() method to Environment`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]
WencongLiu commented on code in PR #23905: URL: https://github.com/apache/flink/pull/23905#discussion_r1440108370 ## flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricher.java: ## @@ -84,14 +88,26 @@ enum FailureType { * Get the ID of the job. * * @return the ID of the job + * @deprecated This method is deprecated since Flink 1.19. All metadata about the job should + * be provided uniformly by {@link #getJobInfo()}. + * @see https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs";> + * FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs */ +@Deprecated Review Comment: For these deprecated methods in the modified interfaces, I've changed them from abstract to default and provide default implementations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]
WencongLiu commented on code in PR #23905: URL: https://github.com/apache/flink/pull/23905#discussion_r1440108479 ## flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricher.java: ## @@ -124,5 +140,16 @@ enum FailureType { * @return the Executor pool */ Executor getIOExecutor(); + +/** + * Get the meta information of current job. + * + * @return the job meta information. + */ +@PublicEvolving +@Nullable +default JobInfo getJobInfo() { Review Comment: I've changed the default methods including `getJobInfo()` and `getTaskInfo()` to be abstract. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]
WencongLiu commented on code in PR #23905: URL: https://github.com/apache/flink/pull/23905#discussion_r1440108007 ## flink-core/src/main/java/org/apache/flink/api/connector/sink2/InitContext.java: ## @@ -35,16 +40,37 @@ public interface InitContext { */ long INITIAL_CHECKPOINT_ID = 1; -/** @return The id of task where the committer is running. */ +/** + * Get the id of task where the committer is running. + * + * @deprecated This method is deprecated since Flink 1.19. All metadata about the task should be + * provided uniformly by {@link #getTaskInfo()}. + * @see https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs";> + * FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs + */ int getSubtaskId(); Review Comment: I have annotated these deprecated methods by `@Deprecated` in the `InitContext`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]
WencongLiu commented on code in PR #23905: URL: https://github.com/apache/flink/pull/23905#discussion_r1440107496 ## flink-core/src/main/java/org/apache/flink/api/connector/sink2/InitContext.java: ## @@ -57,6 +83,34 @@ public interface InitContext { /** * The ID of the current job. Note that Job ID can change in particular upon manual restart. The * returned ID should NOT be used for any job management tasks. + * + * @deprecated This method is deprecated since Flink 1.19. All metadata about the job should be + * provided uniformly by {@link #getJobInfo()}. + * @see https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs";> + * FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs */ JobID getJobId(); + +/** + * Get the meta information of current job. + * + * @return the job meta information. + */ +@PublicEvolving +@Nullable +default JobInfo getJobInfo() { +return null; +} + +/** + * Get the meta information of current task. + * + * @return the task meta information. + */ +@PublicEvolving +@Nullable +default TaskInfo getTaskInfo() { +return null; +} Review Comment: I've changed the default methods including `getJobInfo()` and `getTaskInfo()` to be abstract. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]
WencongLiu commented on code in PR #23905: URL: https://github.com/apache/flink/pull/23905#discussion_r1440107425 ## flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java: ## @@ -68,8 +79,24 @@ public interface SourceReaderContext { * Get the current parallelism of this Source. * * @return the parallelism of the Source. + * @deprecated This method is deprecated since Flink 1.19. All metadata about the task should be + * provided uniformly by {@link #getTaskInfo()}. + * @see https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs";> + * FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs */ +@Deprecated default int currentParallelism() { throw new UnsupportedOperationException(); } + +/** + * Get the meta information of current task. + * + * @return the task meta information. + */ +@PublicEvolving +default TaskInfo getTaskInfo() { +return null; Review Comment: I've changed the default methods including `getJobInfo()` and `getTaskInfo()` to be abstract. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]
WencongLiu commented on code in PR #23905: URL: https://github.com/apache/flink/pull/23905#discussion_r1440107273 ## flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java: ## @@ -119,7 +120,14 @@ public interface Environment { Configuration getJobConfiguration(); /** - * Returns the {@link TaskInfo} object associated with this subtask + * Returns the {@link JobInfo} object associated with current job. + * + * @return JobInfo for current job + */ +JobInfo getJobInfo(); Review Comment: These code changes are prerequisite for the FLIP-380. I've moved these code changes to the commit `[FLINK-33905][core] Add getJobInfo() method to Environment`. ## flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java: ## @@ -39,7 +41,16 @@ public interface SourceReaderContext { */ String getLocalHostName(); -/** @return The index of this subtask. */ +/** + * Get the index of this subtask. + * + * @deprecated This method is deprecated since Flink 1.19. All metadata about the task should be + * provided uniformly by {@link #getTaskInfo()}. + * @see https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs";> + * FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs + */ +@Deprecated int getIndexOfSubtask(); Review Comment: For these deprecated methods in the modified interfaces, I've changed them from abstract to default and provide default implementations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]
WencongLiu commented on code in PR #23905: URL: https://github.com/apache/flink/pull/23905#discussion_r1440107033 ## flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/batch/compact/BatchPartitionCommitterSinkTest.java: ## @@ -127,7 +127,7 @@ public Long timestamp() { }; private static RuntimeContext getMockRuntimeContext() { -return new MockStreamingRuntimeContext(false, 0, 0) { +return new MockStreamingRuntimeContext(false, 1, 0) { Review Comment: These tests are conflicted with the newly added correctness check in the `TaskInfoImpl`. For example, the subtask ID shouldn't be large than or equal to the operator parallelism number. I've fixed these errors in the commit `[hotfix] Fix the logical errors for the tests related to task metadata`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]
WencongLiu commented on code in PR #23905: URL: https://github.com/apache/flink/pull/23905#discussion_r1440106774 ## flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepRuntimeContextTest.java: ## @@ -110,38 +111,41 @@ public void testCepRuntimeContext() { final String taskName = "foobarTask"; final OperatorMetricGroup metricGroup = UnregisteredMetricsGroup.createOperatorMetricGroup(); -final int numberOfParallelSubtasks = 42; -final int indexOfSubtask = 43; +final int numberOfParallelSubtasks = 43; +final int indexOfSubtask = 42; final int attemptNumber = 1337; -final String taskNameWithSubtask = "barfoo"; +final String taskNameWithSubtask = "foobarTask (43/43)#1337"; Review Comment: These tests are conflicted with the newly added correctness check in the `TaskInfoImpl`. For example, the subtask ID shouldn't be large than or equal to the operator parallelism number. I've fixed these errors in the commit `[hotfix] Fix the logical errors for the tests related to task metadata`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]
WencongLiu commented on code in PR #23905: URL: https://github.com/apache/flink/pull/23905#discussion_r1440106348 ## flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java: ## @@ -63,14 +67,27 @@ public interface RuntimeContext { /** * The ID of the current job. Note that Job ID can change in particular upon manual restart. The * returned ID should NOT be used for any job management tasks. + * + * @deprecated This method is deprecated since Flink 1.19. All metadata about the job should be + * provided uniformly by {@link #getJobInfo()}. + * @see https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs";> + * FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs */ +@Deprecated JobID getJobId(); Review Comment: Good point! For these deprecated methods in the modified interfaces, I've changed them from abstract to default and provide default implementations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]
WencongLiu commented on code in PR #23905: URL: https://github.com/apache/flink/pull/23905#discussion_r1440106462 ## flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java: ## @@ -42,6 +43,7 @@ import java.util.List; import java.util.concurrent.Future; +/** The test for flat map operator. */ Review Comment: These code-style clean-ups have been placed into the commit `[hotfix] Fix the lacking class comments for the tests using TaskInfo`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]
WencongLiu commented on code in PR #23905: URL: https://github.com/apache/flink/pull/23905#discussion_r1440106155 ## flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java: ## @@ -473,4 +520,26 @@ AggregatingState getAggregatingState( */ @PublicEvolving MapState getMapState(MapStateDescriptor stateProperties); + +/** + * Get the meta information of current job. + * + * @return the job meta information. + */ +@PublicEvolving +@Nullable +default JobInfo getJobInfo() { +return null; +} + +/** + * Get the meta information of current task. + * + * @return the task meta information. + */ +@PublicEvolving +@Nullable +default TaskInfo getTaskInfo() { +return null; +} Review Comment: I've changed the default methods `getJobInfo()` and `getTaskInfo()` to be abstract. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33941][table-planner] use field index instead of field name about window time column [flink]
LadyForest commented on code in PR #23991: URL: https://github.com/apache/flink/pull/23991#discussion_r1440106137 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateProjectMergeRule.java: ## @@ -18,6 +18,8 @@ package org.apache.flink.table.planner.plan.rules.logical; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.planner.plan.logical.LogicalWindow; Review Comment: Nit: update the FLINK MODIFICATION line number mentioned at L#59 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]
WencongLiu commented on code in PR #23905: URL: https://github.com/apache/flink/pull/23905#discussion_r1440105816 ## flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java: ## @@ -166,7 +169,10 @@ public class Task /** ID which identifies the slot in which the task is supposed to run. */ private final AllocationID allocationId; -/** TaskInfo object for this task. */ +/** The meta information of current job. */ +private final JobInfo jobInfo; Review Comment: These code changes are prerequisite for the FLIP-380. I've moved these code changes to the commit `[FLINK-33905][core] Add getJobInfo() method to Environment`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]
WencongLiu commented on code in PR #23905: URL: https://github.com/apache/flink/pull/23905#discussion_r1440105629 ## flink-core/src/main/java/org/apache/flink/api/common/JobInfoImpl.java: ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common; + +import org.apache.flink.annotation.PublicEvolving; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** The default implementation of {@link JobInfo}. */ +@PublicEvolving Review Comment: The `JobInfoImpl` class has been annotated by `@Internal`. ## flink-core/src/main/java/org/apache/flink/api/common/TaskInfoImpl.java: ## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common; + +import org.apache.flink.annotation.PublicEvolving; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** The default implementation of {@link TaskInfo}. */ +@PublicEvolving Review Comment: The `TaskInfoImpl` class has been annotated by `@Internal`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33966] Fix the getNumRecordsInPerSecond Utility Function [flink-kubernetes-operator]
1996fanrui commented on code in PR #743: URL: https://github.com/apache/flink-kubernetes-operator/pull/743#discussion_r1440105061 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java: ## @@ -221,7 +221,7 @@ private static double getNumRecordsInPerSecond( numRecordsInPerSecond = flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC); } -if (isSource && (numRecordsInPerSecond == null || numRecordsInPerSecond.getSum() == 0)) { +if (!isSource && (numRecordsInPerSecond == null || numRecordsInPerSecond.getSum() == 0)) { numRecordsInPerSecond = flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC); } Review Comment: By the way, current change doesn't make sense. When a task isn't source, it doesn't have `SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC` metric. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33966] Fix the getNumRecordsInPerSecond Utility Function [flink-kubernetes-operator]
1996fanrui commented on code in PR #743: URL: https://github.com/apache/flink-kubernetes-operator/pull/743#discussion_r1440104568 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java: ## @@ -221,7 +221,7 @@ private static double getNumRecordsInPerSecond( numRecordsInPerSecond = flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC); } -if (isSource && (numRecordsInPerSecond == null || numRecordsInPerSecond.getSum() == 0)) { +if (!isSource && (numRecordsInPerSecond == null || numRecordsInPerSecond.getSum() == 0)) { numRecordsInPerSecond = flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC); } Review Comment: Thanks @Yang-LI-CS for the PR! These 2 conditions are indeed the same, it maybe unexpected intuitively. Maybe here logic is numRecordsInPerSecond from 3 metrics: 1. FlinkMetric.NUM_RECORDS_IN_PER_SEC 2. FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC 3. FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC When 1 is unavailable, get it from 2. When 2 is unavailable, get it from 3. I'm not sure about it. Need to confirm with @mxm . - If my guess is right, it's not a bug. And I suggest this PR add a little comments to clarify it. - If my guess is wrong, it's a bug. We can fix it. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-33964) Flink documentation can't be build due to error in Pulsar docs
[ https://issues.apache.org/jira/browse/FLINK-33964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu resolved FLINK-33964. Resolution: Fixed > Flink documentation can't be build due to error in Pulsar docs > -- > > Key: FLINK-33964 > URL: https://issues.apache.org/jira/browse/FLINK-33964 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar, Documentation >Affects Versions: pulsar-4.1.0 >Reporter: Martijn Visser >Assignee: Leonard Xu >Priority: Blocker > Labels: pull-request-available > Fix For: pulsar-4.1.1 > > > https://github.com/apache/flink/actions/runs/7380766702/job/20078487743 > {code:java} > Start building sites … > hugo v0.110.0-e32a493b7826d02763c3b79623952e625402b168+extended linux/amd64 > BuildDate=2023-01-17T12:16:09Z VendorInfo=gohugoio > Error: Error building site: > "/root/flink/docs/themes/connectors/content.zh/docs/connectors/datastream/pulsar.md:491:1": > failed to extract shortcode: template for shortcode > "generated/pulsar_admin_configuration" not found > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33964) Flink documentation can't be build due to error in Pulsar docs
[ https://issues.apache.org/jira/browse/FLINK-33964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17802021#comment-17802021 ] Leonard Xu commented on FLINK-33964: Fixed in flink-connector-pulsar via main: 2be651a043b66936ec787033a851ca8c9d7cf95e v4.1: 05e58b74947dcf46583e7fd1565d459c7b4a44a2 > Flink documentation can't be build due to error in Pulsar docs > -- > > Key: FLINK-33964 > URL: https://issues.apache.org/jira/browse/FLINK-33964 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar, Documentation >Affects Versions: pulsar-4.1.0 >Reporter: Martijn Visser >Assignee: Leonard Xu >Priority: Blocker > Labels: pull-request-available > Fix For: pulsar-4.1.1 > > > https://github.com/apache/flink/actions/runs/7380766702/job/20078487743 > {code:java} > Start building sites … > hugo v0.110.0-e32a493b7826d02763c3b79623952e625402b168+extended linux/amd64 > BuildDate=2023-01-17T12:16:09Z VendorInfo=gohugoio > Error: Error building site: > "/root/flink/docs/themes/connectors/content.zh/docs/connectors/datastream/pulsar.md:491:1": > failed to extract shortcode: template for shortcode > "generated/pulsar_admin_configuration" not found > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33941][table-planner] use field index instead of field name about window time column [flink]
LadyForest commented on code in PR #23991: URL: https://github.com/apache/flink/pull/23991#discussion_r1437490488 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala: ## @@ -1117,6 +1117,7 @@ object AggregateUtil extends Enumeration { } /** Compute field index of given timeField expression. */ + @Deprecated Review Comment: Can we just remove this method? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33964) Flink documentation can't be build due to error in Pulsar docs
[ https://issues.apache.org/jira/browse/FLINK-33964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu updated FLINK-33964: --- Fix Version/s: pulsar-4.1.1 > Flink documentation can't be build due to error in Pulsar docs > -- > > Key: FLINK-33964 > URL: https://issues.apache.org/jira/browse/FLINK-33964 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar, Documentation >Affects Versions: 1.18.0 >Reporter: Martijn Visser >Assignee: Leonard Xu >Priority: Blocker > Labels: pull-request-available > Fix For: pulsar-4.1.1 > > > https://github.com/apache/flink/actions/runs/7380766702/job/20078487743 > {code:java} > Start building sites … > hugo v0.110.0-e32a493b7826d02763c3b79623952e625402b168+extended linux/amd64 > BuildDate=2023-01-17T12:16:09Z VendorInfo=gohugoio > Error: Error building site: > "/root/flink/docs/themes/connectors/content.zh/docs/connectors/datastream/pulsar.md:491:1": > failed to extract shortcode: template for shortcode > "generated/pulsar_admin_configuration" not found > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33964) Flink documentation can't be build due to error in Pulsar docs
[ https://issues.apache.org/jira/browse/FLINK-33964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu updated FLINK-33964: --- Affects Version/s: pulsar-4.1.0 (was: 1.18.0) > Flink documentation can't be build due to error in Pulsar docs > -- > > Key: FLINK-33964 > URL: https://issues.apache.org/jira/browse/FLINK-33964 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar, Documentation >Affects Versions: pulsar-4.1.0 >Reporter: Martijn Visser >Assignee: Leonard Xu >Priority: Blocker > Labels: pull-request-available > Fix For: pulsar-4.1.1 > > > https://github.com/apache/flink/actions/runs/7380766702/job/20078487743 > {code:java} > Start building sites … > hugo v0.110.0-e32a493b7826d02763c3b79623952e625402b168+extended linux/amd64 > BuildDate=2023-01-17T12:16:09Z VendorInfo=gohugoio > Error: Error building site: > "/root/flink/docs/themes/connectors/content.zh/docs/connectors/datastream/pulsar.md:491:1": > failed to extract shortcode: template for shortcode > "generated/pulsar_admin_configuration" not found > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33964) Flink documentation can't be build due to error in Pulsar docs
[ https://issues.apache.org/jira/browse/FLINK-33964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu updated FLINK-33964: --- Affects Version/s: 1.18.0 > Flink documentation can't be build due to error in Pulsar docs > -- > > Key: FLINK-33964 > URL: https://issues.apache.org/jira/browse/FLINK-33964 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar, Documentation >Affects Versions: 1.18.0 >Reporter: Martijn Visser >Assignee: Leonard Xu >Priority: Blocker > Labels: pull-request-available > > https://github.com/apache/flink/actions/runs/7380766702/job/20078487743 > {code:java} > Start building sites … > hugo v0.110.0-e32a493b7826d02763c3b79623952e625402b168+extended linux/amd64 > BuildDate=2023-01-17T12:16:09Z VendorInfo=gohugoio > Error: Error building site: > "/root/flink/docs/themes/connectors/content.zh/docs/connectors/datastream/pulsar.md:491:1": > failed to extract shortcode: template for shortcode > "generated/pulsar_admin_configuration" not found > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33964) Flink documentation can't be build due to error in Pulsar docs
[ https://issues.apache.org/jira/browse/FLINK-33964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17802019#comment-17802019 ] Leonard Xu commented on FLINK-33964: Document action has been fixed https://github.com/apache/flink/actions/runs/7391764752/job/20114595324 > Flink documentation can't be build due to error in Pulsar docs > -- > > Key: FLINK-33964 > URL: https://issues.apache.org/jira/browse/FLINK-33964 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar, Documentation >Reporter: Martijn Visser >Assignee: Leonard Xu >Priority: Blocker > Labels: pull-request-available > > https://github.com/apache/flink/actions/runs/7380766702/job/20078487743 > {code:java} > Start building sites … > hugo v0.110.0-e32a493b7826d02763c3b79623952e625402b168+extended linux/amd64 > BuildDate=2023-01-17T12:16:09Z VendorInfo=gohugoio > Error: Error building site: > "/root/flink/docs/themes/connectors/content.zh/docs/connectors/datastream/pulsar.md:491:1": > failed to extract shortcode: template for shortcode > "generated/pulsar_admin_configuration" not found > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]
AncyRominus commented on code in PR #733: URL: https://github.com/apache/flink-kubernetes-operator/pull/733#discussion_r1440100481 ## flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java: ## @@ -17,43 +17,88 @@ package org.apache.flink.kubernetes.operator.admission.mutator; +import org.apache.flink.kubernetes.operator.admission.informer.InformerManager; import org.apache.flink.kubernetes.operator.api.CrdConstants; +import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; +import org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator; import com.fasterxml.jackson.databind.ObjectMapper; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.informers.cache.Cache; import io.javaoperatorsdk.webhook.admission.NotAllowedException; import io.javaoperatorsdk.webhook.admission.Operation; import io.javaoperatorsdk.webhook.admission.mutation.Mutator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; +import java.util.Optional; +import java.util.Set; /** The default mutator. */ public class FlinkMutator implements Mutator { private static final Logger LOG = LoggerFactory.getLogger(FlinkMutator.class); private static final ObjectMapper mapper = new ObjectMapper(); +private final Set mutators; +private final InformerManager informerManager; + +public FlinkMutator(Set mutators, InformerManager informerManager) { +this.mutators = mutators; +this.informerManager = informerManager; +} @Override public HasMetadata mutate(HasMetadata resource, Operation operation) throws NotAllowedException { -if (operation == Operation.CREATE) { +if (operation == Operation.CREATE || operation == Operation.UPDATE) { LOG.debug("Mutating resource {}", resource); - if (CrdConstants.KIND_SESSION_JOB.equals(resource.getKind())) { -try { -var sessionJob = mapper.convertValue(resource, FlinkSessionJob.class); -setSessionTargetLabel(sessionJob); -return sessionJob; -} catch (Exception e) { -throw new RuntimeException(e); -} +return mutateSessionJob(resource); +} +if (CrdConstants.KIND_FLINK_DEPLOYMENT.equals(resource.getKind())) { +return mutateDeployment(resource); } } return resource; } +private FlinkSessionJob mutateSessionJob(HasMetadata resource) { +try { +var sessionJob = mapper.convertValue(resource, FlinkSessionJob.class); +var namespace = sessionJob.getMetadata().getNamespace(); +var deploymentName = sessionJob.getSpec().getDeploymentName(); +var key = Cache.namespaceKeyFunc(namespace, deploymentName); +var deployment = + informerManager.getFlinkDepInformer(namespace).getStore().getByKey(key); + +setSessionTargetLabel(sessionJob); Review Comment: Thanks @gyfora ! Ys, that's a good point. I have added the logic for setting session target label as part of default mutator logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]
AncyRominus commented on code in PR #733: URL: https://github.com/apache/flink-kubernetes-operator/pull/733#discussion_r1440100481 ## flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java: ## @@ -17,43 +17,88 @@ package org.apache.flink.kubernetes.operator.admission.mutator; +import org.apache.flink.kubernetes.operator.admission.informer.InformerManager; import org.apache.flink.kubernetes.operator.api.CrdConstants; +import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; +import org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator; import com.fasterxml.jackson.databind.ObjectMapper; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.informers.cache.Cache; import io.javaoperatorsdk.webhook.admission.NotAllowedException; import io.javaoperatorsdk.webhook.admission.Operation; import io.javaoperatorsdk.webhook.admission.mutation.Mutator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; +import java.util.Optional; +import java.util.Set; /** The default mutator. */ public class FlinkMutator implements Mutator { private static final Logger LOG = LoggerFactory.getLogger(FlinkMutator.class); private static final ObjectMapper mapper = new ObjectMapper(); +private final Set mutators; +private final InformerManager informerManager; + +public FlinkMutator(Set mutators, InformerManager informerManager) { +this.mutators = mutators; +this.informerManager = informerManager; +} @Override public HasMetadata mutate(HasMetadata resource, Operation operation) throws NotAllowedException { -if (operation == Operation.CREATE) { +if (operation == Operation.CREATE || operation == Operation.UPDATE) { LOG.debug("Mutating resource {}", resource); - if (CrdConstants.KIND_SESSION_JOB.equals(resource.getKind())) { -try { -var sessionJob = mapper.convertValue(resource, FlinkSessionJob.class); -setSessionTargetLabel(sessionJob); -return sessionJob; -} catch (Exception e) { -throw new RuntimeException(e); -} +return mutateSessionJob(resource); +} +if (CrdConstants.KIND_FLINK_DEPLOYMENT.equals(resource.getKind())) { +return mutateDeployment(resource); } } return resource; } +private FlinkSessionJob mutateSessionJob(HasMetadata resource) { +try { +var sessionJob = mapper.convertValue(resource, FlinkSessionJob.class); +var namespace = sessionJob.getMetadata().getNamespace(); +var deploymentName = sessionJob.getSpec().getDeploymentName(); +var key = Cache.namespaceKeyFunc(namespace, deploymentName); +var deployment = + informerManager.getFlinkDepInformer(namespace).getStore().getByKey(key); + +setSessionTargetLabel(sessionJob); Review Comment: Thanks @gyfora ! Added the logic for setting session target label as part of default mutator logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33884] Update Pulsar dependency to 3.0.2 in Pulsar Connector [flink-connector-pulsar]
dchristle commented on PR #72: URL: https://github.com/apache/flink-connector-pulsar/pull/72#issuecomment-1874876697 > @dchristle I've triggered the CI, but I'm expecting that upgrading the version will also have impact on the license checks (the listed versions in the NOTICE files) Got it. Yes, I see the CI check for the NOTICE file failed due to out-of-date info. I updated the dependency versions in the META-INF NOTICE file. I also updated the year to 2024, in case that could also prevent the CI check from passing. We can bump the year in a separate hotfix PR, if you'd prefer, as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix] Update copyright NOTICE year to 2024 [flink]
1996fanrui merged PR #24018: URL: https://github.com/apache/flink/pull/24018 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33969] Implement restore tests for TableSourceScan node [flink]
flinkbot commented on PR #24020: URL: https://github.com/apache/flink/pull/24020#issuecomment-1874824806 ## CI report: * 32e25c7d0a6c58dc23ae28437f7159bf449bdce1 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33969) Implement restore tests for TableSourceScan node
[ https://issues.apache.org/jira/browse/FLINK-33969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33969: --- Labels: pull-request-available (was: ) > Implement restore tests for TableSourceScan node > > > Key: FLINK-33969 > URL: https://issues.apache.org/jira/browse/FLINK-33969 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Bonnie Varghese >Assignee: Bonnie Varghese >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-33969] Implement restore tests for TableSourceScan node [flink]
bvarghese1 opened a new pull request, #24020: URL: https://github.com/apache/flink/pull/24020 ## What is the purpose of the change *Add restore tests for TableSourceScan node* ## Verifying this change This change added tests and can be verified as follows: - Added restore tests for TableSourceScan node which verifies the generated compiled plan with the saved compiled plan. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-33969) Implement restore tests for TableSourceScan node
Bonnie Varghese created FLINK-33969: --- Summary: Implement restore tests for TableSourceScan node Key: FLINK-33969 URL: https://issues.apache.org/jira/browse/FLINK-33969 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Bonnie Varghese Assignee: Bonnie Varghese -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33814) Autoscaler Standalone control loop supports multiple thread
[ https://issues.apache.org/jira/browse/FLINK-33814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33814: --- Labels: pull-request-available (was: ) > Autoscaler Standalone control loop supports multiple thread > --- > > Key: FLINK-33814 > URL: https://issues.apache.org/jira/browse/FLINK-33814 > Project: Flink > Issue Type: Sub-task > Components: Autoscaler >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > When the job list has a lot of jobs, single thread isn't enough. > So Autoscaler Standalone control loop supports multiple thread is very useful > for massive production, it's similar to > kubernetes.operator.reconcile.parallelism. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-33814][autoscaler] Autoscaler Standalone control loop supports multiple threads [flink-kubernetes-operator]
1996fanrui opened a new pull request, #744: URL: https://github.com/apache/flink-kubernetes-operator/pull/744 ## What is the purpose of the change When the job list has a lot of jobs, single thread isn't enough. So Autoscaler Standalone control loop supports multiple thread is very useful for massive production, it's similar to `kubernetes.operator.reconcile.parallelism`. ## Brief change log Introduce `autoscaler.standalone.control-loop.parallelism`. ## Verifying this change - Added `StandaloneAutoscalerExecutorTest#testScalingParallelism` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changes to the `CustomResourceDescriptors`: no - Core observer or reconciler logic that is regularly executed: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? docs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-33965) Refactor the configuration for autoscaler standalone
[ https://issues.apache.org/jira/browse/FLINK-33965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan resolved FLINK-33965. - Fix Version/s: kubernetes-operator-1.8.0 Resolution: Fixed > Refactor the configuration for autoscaler standalone > > > Key: FLINK-33965 > URL: https://issues.apache.org/jira/browse/FLINK-33965 > Project: Flink > Issue Type: Sub-task >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > Currently, all configurations of autoscaler standalone are maintained in > string key. > When autoscaler standalone has a little options, it's easy to maintain. > However, I found it's hard to maintain when we add more options. > During I developing the JDBC autoscaler state store and control loop supports > multiple thread. It will introduce more options. > h2. Solution: > Introducing the AutoscalerStandaloneOptions to manage all options of > autoscaler standalone. And output the doc for it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-30535] Introduce TTL state based benchmarks [flink-benchmarks]
Zakelly commented on PR #83: URL: https://github.com/apache/flink-benchmarks/pull/83#issuecomment-1874802825 @Myasuka Kindly remind~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33965) Refactor the configuration for autoscaler standalone
[ https://issues.apache.org/jira/browse/FLINK-33965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801998#comment-17801998 ] Rui Fan commented on FLINK-33965: - Merged to main(1.8.0) via : fa938439ad8a7601462a3be268077aa4078881bc and 3cae5907d3a5c610a1aa8e5d41095ddbcba676ef > Refactor the configuration for autoscaler standalone > > > Key: FLINK-33965 > URL: https://issues.apache.org/jira/browse/FLINK-33965 > Project: Flink > Issue Type: Sub-task >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Labels: pull-request-available > > Currently, all configurations of autoscaler standalone are maintained in > string key. > When autoscaler standalone has a little options, it's easy to maintain. > However, I found it's hard to maintain when we add more options. > During I developing the JDBC autoscaler state store and control loop supports > multiple thread. It will introduce more options. > h2. Solution: > Introducing the AutoscalerStandaloneOptions to manage all options of > autoscaler standalone. And output the doc for it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33965][autoscaler] Introducing the AutoscalerStandaloneOptions to simplify options for autoscaler standalone [flink-kubernetes-operator]
1996fanrui merged PR #742: URL: https://github.com/apache/flink-kubernetes-operator/pull/742 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33968][runtime] Advance the calculation of num of subpartitions to the time of initializing execution job vertex [flink]
flinkbot commented on PR #24019: URL: https://github.com/apache/flink/pull/24019#issuecomment-1874793432 ## CI report: * 433585f7f7fb48fe903a8beafce86e0bfec5d776 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33968][runtime] Advance the calculation of num of subpartitions to the time of initializing execution job vertex [flink]
wanglijie95 commented on PR #24019: URL: https://github.com/apache/flink/pull/24019#issuecomment-1874792567 @JunRuiLee @zhuzhurk Could you help to review this PR ? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33968) Compute the number of subpartitions when initializing executon job vertices
[ https://issues.apache.org/jira/browse/FLINK-33968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33968: --- Labels: pull-request-available (was: ) > Compute the number of subpartitions when initializing executon job vertices > --- > > Key: FLINK-33968 > URL: https://issues.apache.org/jira/browse/FLINK-33968 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Lijie Wang >Assignee: Lijie Wang >Priority: Major > Labels: pull-request-available > > Currently, when using dynamic graphs, the subpartition-num of a task is > lazily calculated until the task deployment moment, this may lead to some > uncertainties in job recovery scenarios: > Before jm crashs, when deploying upstream tasks, the parallelism of > downstream vertex may be unknown, so the subpartiton-num will be the max > parallelism of downstream job vertex. However, after jm restarts, when > deploying upstream tasks, the parallelism of downstream job vertex may be > known(has been calculated before jm crashs and been recovered after jm > restarts), so the subpartiton-num will be the actual parallelism of > downstream job vertex. The difference of calculated subpartition-num will > lead to the partitions generated before jm crashs cannot be reused after jm > restarts. > We will solve this problem by advancing the calculation of subpartitoin-num > to the moment of initializing executon job vertex (in ctor of > IntermediateResultPartition) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32958) Support VIEW as a source table in CREATE TABLE ... Like statement
[ https://issues.apache.org/jira/browse/FLINK-32958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801993#comment-17801993 ] Han commented on FLINK-32958: - [~martijnvisser] sure. spark doc: [https://spark.apache.org/docs/latest/sql-ref-syntax-ddl-create-table-like.html] > The {{CREATE TABLE}} statement defines a new table using the > definition/metadata of an existing table or {*}view{*}. hive doc: [https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-CreateTableLike] > Before Hive 0.8.0, CREATE TABLE LIKE view_name would make a copy of the view. >In Hive 0.8.0 and later releases, CREATE TABLE LIKE view_name creates a table >by adopting the schema of view_name (fields and partition columns) using >defaults for SerDe and file formats. They all support "CREATE TABLE LIKE VIEW" syntax > Support VIEW as a source table in CREATE TABLE ... Like statement > - > > Key: FLINK-32958 > URL: https://issues.apache.org/jira/browse/FLINK-32958 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.17.1 >Reporter: Han >Priority: Major > Labels: pull-request-available > > We can't create a table from a view through CREATE TABLE LIKE statement > > case 1: > {code:sql} > create view source_view as select id,val from source; > create table sink with ('connector' = 'print') like source_view (excluding > all); > insert into sink select * from source_view;{code} > case 2 > {code:java} > DataStreamSource source = ...; > tEnv.createTemporaryView("source", source); > tEnv.executeSql("create table sink with ('connector' = 'print') like source > (excluding all)"); > tEnv.executeSql("insert into sink select * from source");{code} > > The above cases will throw an exception: > {code:java} > Source table '`default_catalog`.`default_database`.`source`' of the LIKE > clause can not be a VIEW{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-33968][runtime] Advance the calculation of num of subpartitions to the time of initializing execution job vertex [flink]
wanglijie95 opened a new pull request, #24019: URL: https://github.com/apache/flink/pull/24019 ## What is the purpose of the change Solve FLINK-33968 ## Verifying this change This change is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**no**) - The serializers: (**no**) - The runtime per-record code paths (performance sensitive): (**no**) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (**no**) - The S3 file system connector: (**no**) ## Documentation - Does this pull request introduce a new feature? (**no**) - If yes, how is the feature documented? (**not applicable**) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33940) Update the auto-derivation rule of max parallelism for enlarged upscaling space
[ https://issues.apache.org/jira/browse/FLINK-33940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801992#comment-17801992 ] Rui Fan commented on FLINK-33940: - Hi [~Zhanghao Chen] , IIUC flink-benchmarks[1] is checking the performance related to state, and we can see the performance change in this web UI[2]. Such as: the valueGet.HEAP[3][4] means valueState.get for hashmap state backend. !image-2024-01-03-10-52-05-861.png|width=949,height=359! [1] [https://github.com/apache/flink-benchmarks] [2][http://flink-speed.xyz|http://flink-speed.xyz/] [3][https://github.com/apache/flink-benchmarks/blob/master/src/main/java/org/apache/flink/state/benchmark/ValueStateBenchmark.java] [4]http://flink-speed.xyz/timeline/?ben=valueGet.HEAP&env=3 > Update the auto-derivation rule of max parallelism for enlarged upscaling > space > --- > > Key: FLINK-33940 > URL: https://issues.apache.org/jira/browse/FLINK-33940 > Project: Flink > Issue Type: Improvement > Components: API / Core >Reporter: Zhanghao Chen >Assignee: Zhanghao Chen >Priority: Major > Attachments: image-2024-01-03-10-52-05-861.png > > > *Background* > The choice of the max parallelism of an stateful operator is important as it > limits the upper bound of the parallelism of the opeartor while it can also > add extra overhead when being set too large. Currently, the max parallelism > of an opeartor is either fixed to a value specified by API core / pipeline > option or auto-derived with the following rules: > {{min(max(roundUpToPowerOfTwo(operatorParallelism * 1.5), 128), 32767)}} > *Problem* > Recently, the elasticity of Flink jobs is becoming more and more valued by > users. The current auto-derived max parallelism was introduced a time time > ago and only allows the operator parallelism to be roughly doubled, which is > not desired for elasticity. Setting an max parallelism manually may not be > desired as well: users may not have the sufficient expertise to select a good > max-parallelism value. > *Proposal* > Update the auto-derivation rule of max parallelism to derive larger max > parallelism for better elasticity experience out of the box. A candidate is > as follows: > {{min(max(roundUpToPowerOfTwo(operatorParallelism * {*}5{*}), {*}1024{*}), > 32767)}} > Looking forward to your opinions on this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33940) Update the auto-derivation rule of max parallelism for enlarged upscaling space
[ https://issues.apache.org/jira/browse/FLINK-33940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan updated FLINK-33940: Attachment: image-2024-01-03-10-52-05-861.png > Update the auto-derivation rule of max parallelism for enlarged upscaling > space > --- > > Key: FLINK-33940 > URL: https://issues.apache.org/jira/browse/FLINK-33940 > Project: Flink > Issue Type: Improvement > Components: API / Core >Reporter: Zhanghao Chen >Assignee: Zhanghao Chen >Priority: Major > Attachments: image-2024-01-03-10-52-05-861.png > > > *Background* > The choice of the max parallelism of an stateful operator is important as it > limits the upper bound of the parallelism of the opeartor while it can also > add extra overhead when being set too large. Currently, the max parallelism > of an opeartor is either fixed to a value specified by API core / pipeline > option or auto-derived with the following rules: > {{min(max(roundUpToPowerOfTwo(operatorParallelism * 1.5), 128), 32767)}} > *Problem* > Recently, the elasticity of Flink jobs is becoming more and more valued by > users. The current auto-derived max parallelism was introduced a time time > ago and only allows the operator parallelism to be roughly doubled, which is > not desired for elasticity. Setting an max parallelism manually may not be > desired as well: users may not have the sufficient expertise to select a good > max-parallelism value. > *Proposal* > Update the auto-derivation rule of max parallelism to derive larger max > parallelism for better elasticity experience out of the box. A candidate is > as follows: > {{min(max(roundUpToPowerOfTwo(operatorParallelism * {*}5{*}), {*}1024{*}), > 32767)}} > Looking forward to your opinions on this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33968) Compute the number of subpartitions when initializing executon job vertices
[ https://issues.apache.org/jira/browse/FLINK-33968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lijie Wang updated FLINK-33968: --- Description: Currently, when using dynamic graphs, the subpartition-num of a task is lazily calculated until the task deployment moment, this may lead to some uncertainties in job recovery scenarios: Before jm crashs, when deploying upstream tasks, the parallelism of downstream vertex may be unknown, so the subpartiton-num will be the max parallelism of downstream job vertex. However, after jm restarts, when deploying upstream tasks, the parallelism of downstream job vertex may be known(has been calculated before jm crashs and been recovered after jm restarts), so the subpartiton-num will be the actual parallelism of downstream job vertex. The difference of calculated subpartition-num will lead to the partitions generated before jm crashs cannot be reused after jm restarts. We will solve this problem by advancing the calculation of subpartitoin-num to the moment of initializing executon job vertex (in ctor of IntermediateResultPartition) was: Currently, when using dynamic graphs, the subpartition-num of a task is lazily calculated until the task deployment moment, this may lead to some uncertainties in job recovery scenarios. Before jm crashs, when deploying upstream tasks, the parallelism of downstream vertex may be unknown, so the subpartiton-num will be the max parallelism of downstream job vertex. However, after jm restarts, when deploying upstream tasks, the parallelism of downstream job vertex may be known(has been calculated before jm crashs and been recovered after jm restarts), so the subpartiton-num will be the actual parallelism of downstream job vertex. The difference of calculated subpartition-num will lead to the partitions generated before jm crashs cannot be reused after jm restarts. We will solve this problem by advancing the calculation of subpartitoin-num to the moment of initializing executon job vertex (in ctor of IntermediateResultPartition) > Compute the number of subpartitions when initializing executon job vertices > --- > > Key: FLINK-33968 > URL: https://issues.apache.org/jira/browse/FLINK-33968 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Lijie Wang >Assignee: Lijie Wang >Priority: Major > > Currently, when using dynamic graphs, the subpartition-num of a task is > lazily calculated until the task deployment moment, this may lead to some > uncertainties in job recovery scenarios: > Before jm crashs, when deploying upstream tasks, the parallelism of > downstream vertex may be unknown, so the subpartiton-num will be the max > parallelism of downstream job vertex. However, after jm restarts, when > deploying upstream tasks, the parallelism of downstream job vertex may be > known(has been calculated before jm crashs and been recovered after jm > restarts), so the subpartiton-num will be the actual parallelism of > downstream job vertex. The difference of calculated subpartition-num will > lead to the partitions generated before jm crashs cannot be reused after jm > restarts. > We will solve this problem by advancing the calculation of subpartitoin-num > to the moment of initializing executon job vertex (in ctor of > IntermediateResultPartition) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33968) Compute the number of subpartitions when initializing executon job vertices
Lijie Wang created FLINK-33968: -- Summary: Compute the number of subpartitions when initializing executon job vertices Key: FLINK-33968 URL: https://issues.apache.org/jira/browse/FLINK-33968 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Lijie Wang Assignee: Lijie Wang Currently, when using dynamic graphs, the subpartition-num of a task is lazily calculated until the task deployment moment, this may lead to some uncertainties in job recovery scenarios. Before jm crashs, when deploying upstream tasks, the parallelism of downstream vertex may be unknown, so the subpartiton-num will be the max parallelism of downstream job vertex. However, after jm restarts, when deploying upstream tasks, the parallelism of downstream job vertex may be known(has been calculated before jm crashs and been recovered after jm restarts), so the subpartiton-num will be the actual parallelism of downstream job vertex. The difference of calculated subpartition-num will lead to the partitions generated before jm crashs cannot be reused after jm restarts. We will solve this problem by advancing the calculation of subpartitoin-num to the moment of initializing executon job vertex (in ctor of IntermediateResultPartition) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-33949) METHOD_ABSTRACT_NOW_DEFAULT should be both source compatible and binary compatible
[ https://issues.apache.org/jira/browse/FLINK-33949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wencong Liu closed FLINK-33949. --- Resolution: Not A Problem > METHOD_ABSTRACT_NOW_DEFAULT should be both source compatible and binary > compatible > -- > > Key: FLINK-33949 > URL: https://issues.apache.org/jira/browse/FLINK-33949 > Project: Flink > Issue Type: Bug > Components: Test Infrastructure >Affects Versions: 1.19.0 >Reporter: Wencong Liu >Priority: Major > Fix For: 1.19.0 > > > Currently I'm trying to refactor some APIs annotated by @Public in > [FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs - > Apache Flink - Apache Software > Foundation|https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs]. > When an abstract method is changed into a default method, the japicmp maven > plugin names this change METHOD_ABSTRACT_NOW_DEFAULT and considers it as > source incompatible and binary incompatible. > The reason maybe that if the abstract method becomes default, the logic in > the default method will be ignored by the previous implementations. > I create a test case in which a job is compiled with newly changed default > method and submitted to the previous version. There is no exception thrown. > Therefore, the METHOD_ABSTRACT_NOW_DEFAULT shouldn't be incompatible both for > source and binary. We could add the following settings to override the > default values for binary and source compatibility, such as: > {code:java} > > >METHOD_ABSTRACT_NOW_DEFAULT >true >true > > {code} > By the way, currently the master branch checks both source compatibility and > binary compatibility between minor versions. According to Flink's API > compatibility constraints, the master branch shouldn't check binary > compatibility. There is already jira FLINK-33009 to track it and we should > fix it as soon as possible. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-33892) FLIP-383: Support Job Recovery for Batch Jobs
[ https://issues.apache.org/jira/browse/FLINK-33892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lijie Wang reassigned FLINK-33892: -- Assignee: Junrui Li (was: Lijie Wang) > FLIP-383: Support Job Recovery for Batch Jobs > - > > Key: FLINK-33892 > URL: https://issues.apache.org/jira/browse/FLINK-33892 > Project: Flink > Issue Type: New Feature > Components: Runtime / Coordination >Reporter: Lijie Wang >Assignee: Junrui Li >Priority: Major > > This is the umbrella ticket for > [FLIP-383|https://cwiki.apache.org/confluence/display/FLINK/FLIP-383%3A+Support+Job+Recovery+for+Batch+Jobs] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33935][checkpoint] Improve the default value logic related to `execution.checkpointing.tolerable-failed-checkpoints` and `state.backend.type` [flink]
Zakelly commented on code in PR #23987: URL: https://github.com/apache/flink/pull/23987#discussion_r1440016826 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java: ## @@ -1040,8 +1041,9 @@ public void configure(ReadableConfig configuration, ClassLoader classLoader) { configuration .getOptional(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG) .ifPresent(this::enableChangelogStateBackend); -Optional.ofNullable(loadStateBackend(configuration, classLoader)) -.ifPresent(this::setStateBackend); +configuration +.getOptional(StateBackendOptions.STATE_BACKEND) +.ifPresent(conf -> setStateBackend(loadStateBackend(configuration, classLoader))); Review Comment: @1996fanrui Thanks for your explanation! I missed the point of `getOptional`. Thus I have no more question. +1 from my side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33949) METHOD_ABSTRACT_NOW_DEFAULT should be both source compatible and binary compatible
[ https://issues.apache.org/jira/browse/FLINK-33949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801988#comment-17801988 ] Wencong Liu commented on FLINK-33949: - Thanks for the explanation from [~chesnay] . Given that all the actively running code might throw related exceptions, it would be unreasonable to directly modify the rules of japicmp. If there's a specific interface that needs to break this rule, we should simply exclude that interface. This ticket can be closed now. > METHOD_ABSTRACT_NOW_DEFAULT should be both source compatible and binary > compatible > -- > > Key: FLINK-33949 > URL: https://issues.apache.org/jira/browse/FLINK-33949 > Project: Flink > Issue Type: Bug > Components: Test Infrastructure >Affects Versions: 1.19.0 >Reporter: Wencong Liu >Priority: Major > Fix For: 1.19.0 > > > Currently I'm trying to refactor some APIs annotated by @Public in > [FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs - > Apache Flink - Apache Software > Foundation|https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs]. > When an abstract method is changed into a default method, the japicmp maven > plugin names this change METHOD_ABSTRACT_NOW_DEFAULT and considers it as > source incompatible and binary incompatible. > The reason maybe that if the abstract method becomes default, the logic in > the default method will be ignored by the previous implementations. > I create a test case in which a job is compiled with newly changed default > method and submitted to the previous version. There is no exception thrown. > Therefore, the METHOD_ABSTRACT_NOW_DEFAULT shouldn't be incompatible both for > source and binary. We could add the following settings to override the > default values for binary and source compatibility, such as: > {code:java} > > >METHOD_ABSTRACT_NOW_DEFAULT >true >true > > {code} > By the way, currently the master branch checks both source compatibility and > binary compatibility between minor versions. According to Flink's API > compatibility constraints, the master branch shouldn't check binary > compatibility. There is already jira FLINK-33009 to track it and we should > fix it as soon as possible. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33940) Update the auto-derivation rule of max parallelism for enlarged upscaling space
[ https://issues.apache.org/jira/browse/FLINK-33940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801987#comment-17801987 ] Zhanghao Chen commented on FLINK-33940: --- [~mxm] thanks for the quick response, I'll update with some Nextmark benchmark result later on the performance impact of bumping the num of keygroups for an informed decision. As for the factor, 10 sounds good for me, maybe let's also see what [~fanrui] & [~gyfora] think about it. > Update the auto-derivation rule of max parallelism for enlarged upscaling > space > --- > > Key: FLINK-33940 > URL: https://issues.apache.org/jira/browse/FLINK-33940 > Project: Flink > Issue Type: Improvement > Components: API / Core >Reporter: Zhanghao Chen >Assignee: Zhanghao Chen >Priority: Major > > *Background* > The choice of the max parallelism of an stateful operator is important as it > limits the upper bound of the parallelism of the opeartor while it can also > add extra overhead when being set too large. Currently, the max parallelism > of an opeartor is either fixed to a value specified by API core / pipeline > option or auto-derived with the following rules: > {{min(max(roundUpToPowerOfTwo(operatorParallelism * 1.5), 128), 32767)}} > *Problem* > Recently, the elasticity of Flink jobs is becoming more and more valued by > users. The current auto-derived max parallelism was introduced a time time > ago and only allows the operator parallelism to be roughly doubled, which is > not desired for elasticity. Setting an max parallelism manually may not be > desired as well: users may not have the sufficient expertise to select a good > max-parallelism value. > *Proposal* > Update the auto-derivation rule of max parallelism to derive larger max > parallelism for better elasticity experience out of the box. A candidate is > as follows: > {{min(max(roundUpToPowerOfTwo(operatorParallelism * {*}5{*}), {*}1024{*}), > 32767)}} > Looking forward to your opinions on this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33792] Generate the same code for the same logic [flink]
zoudan commented on code in PR #23984: URL: https://github.com/apache/flink/pull/23984#discussion_r1440008761 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala: ## @@ -124,14 +124,27 @@ object CodeGenUtils { private val nameCounter = new AtomicLong - def newName(name: String): String = { -s"$name$$${nameCounter.getAndIncrement}" + def newName(context: CodeGeneratorContext = null, name: String): String = { Review Comment: Get it, removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-33966) Fix the getNumRecordsInPerSecond Utility Function
[ https://issues.apache.org/jira/browse/FLINK-33966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan reassigned FLINK-33966: --- Assignee: Yang LI (was: Li Yang) > Fix the getNumRecordsInPerSecond Utility Function > - > > Key: FLINK-33966 > URL: https://issues.apache.org/jira/browse/FLINK-33966 > Project: Flink > Issue Type: Bug > Components: Autoscaler >Affects Versions: kubernetes-operator-1.7.0 >Reporter: Yang LI >Assignee: Yang LI >Priority: Minor > Labels: pull-request-available > > We have following code in the codebase > {code:java} > if (isSource && (numRecordsInPerSecond == null || > numRecordsInPerSecond.getSum() == 0)) { > numRecordsInPerSecond = > > flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC); > }{code} > {code:java} > if (isSource && (numRecordsInPerSecond == null || > numRecordsInPerSecond.getSum() == 0)) { > numRecordsInPerSecond = > > flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC); > }{code} > with two times the same condition check > > {*}Definition of done{*}: > Update getNumRecordsInPerSecond'{{{}s{}}} second {{if}} condition from {{if > (isSource && ...)}} to {{{}if (!isSource && ...){}}}. This addresses the > redundant check and ensures correct metric fetching for non-source operators. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-33966) Fix the getNumRecordsInPerSecond Utility Function
[ https://issues.apache.org/jira/browse/FLINK-33966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan reassigned FLINK-33966: --- Assignee: Li Yang > Fix the getNumRecordsInPerSecond Utility Function > - > > Key: FLINK-33966 > URL: https://issues.apache.org/jira/browse/FLINK-33966 > Project: Flink > Issue Type: Bug > Components: Autoscaler >Affects Versions: kubernetes-operator-1.7.0 >Reporter: Yang LI >Assignee: Li Yang >Priority: Minor > Labels: pull-request-available > > We have following code in the codebase > {code:java} > if (isSource && (numRecordsInPerSecond == null || > numRecordsInPerSecond.getSum() == 0)) { > numRecordsInPerSecond = > > flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC); > }{code} > {code:java} > if (isSource && (numRecordsInPerSecond == null || > numRecordsInPerSecond.getSum() == 0)) { > numRecordsInPerSecond = > > flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC); > }{code} > with two times the same condition check > > {*}Definition of done{*}: > Update getNumRecordsInPerSecond'{{{}s{}}} second {{if}} condition from {{if > (isSource && ...)}} to {{{}if (!isSource && ...){}}}. This addresses the > redundant check and ensures correct metric fetching for non-source operators. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33792] Generate the same code for the same logic [flink]
zoudan commented on PR #23984: URL: https://github.com/apache/flink/pull/23984#issuecomment-1874770437 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33962) Chaining-agnostic OperatorID generation for improved state compatibility on parallelism change
[ https://issues.apache.org/jira/browse/FLINK-33962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801984#comment-17801984 ] Zhanghao Chen commented on FLINK-33962: --- Thanks for the quick response, I'll draft a FLIP and raise discussion first. > Chaining-agnostic OperatorID generation for improved state compatibility on > parallelism change > -- > > Key: FLINK-33962 > URL: https://issues.apache.org/jira/browse/FLINK-33962 > Project: Flink > Issue Type: Improvement > Components: API / Core >Reporter: Zhanghao Chen >Priority: Major > > *Background* > Flink restores opeartor state from snapshots based on matching the > operatorIDs. Since Flink 1.2, {{StreamGraphHasherV2}} is used for operatorID > generation when no user-set uid exists. The generated OperatorID is > deterministic with respect to: > * node-local properties (the traverse ID in the BFS for the stream graph) > * chained output nodes > * input nodes hashes > *Problem* > The chaining behavior will affect state compatibility, as the generation of > the OperatorID of an Op is dependent on its chained output nodes. For > example, a simple source->sink DAG with source and sink chained together is > state imcompatible with an otherwise identical DAG with source and sink > unchained (either because the parallelisms of the two ops are changed to be > unequal or chaining is disabled). This greatly limits the flexibility to > perform chain-breaking/joining for performance tuning. > *Proposal* > Introduce {{StreamGraphHasherV3}} that is agnostic to the chaining behavior > of operators, which effectively just removes L227-235 of > [flink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java > at master · apache/flink > (github.com)|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java]. > > This will not hurt the deteministicity of the ID generation across job > submission as long as the stream graph topology doesn't change, and since new > versions of Flink have already adopted pure operator-level state recovery, > this will not break state recovery across job submission as long as both > submissions use the same hasher. > This will, however, break cross-version state compatibility. So we can > introduce a new option to enable using HasherV3 in v1.19 and consider making > it the default hasher in v2.0. > Looking forward to suggestions on this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [BP][FLINK-33964][pulsar][docs] Drop admin docs reference [flink-connector-pulsar]
leonardBang merged PR #76: URL: https://github.com/apache/flink-connector-pulsar/pull/76 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33962) Chaining-agnostic OperatorID generation for improved state compatibility on parallelism change
[ https://issues.apache.org/jira/browse/FLINK-33962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801976#comment-17801976 ] Xintong Song commented on FLINK-33962: -- Thanks for reaching out, [~Zhanghao Chen]. Just some quick responses, I would need to look a bit more into the related components before giving further comments. Based on your description, in general I think it makes sense to make operator id generation independent from chaining. However, as you have already mentioned, this is a breaking change that may result in state incompatibility. Therefore, I think it deserves a FLIP discussion and an official vote. > Chaining-agnostic OperatorID generation for improved state compatibility on > parallelism change > -- > > Key: FLINK-33962 > URL: https://issues.apache.org/jira/browse/FLINK-33962 > Project: Flink > Issue Type: Improvement > Components: API / Core >Reporter: Zhanghao Chen >Priority: Major > > *Background* > Flink restores opeartor state from snapshots based on matching the > operatorIDs. Since Flink 1.2, {{StreamGraphHasherV2}} is used for operatorID > generation when no user-set uid exists. The generated OperatorID is > deterministic with respect to: > * node-local properties (the traverse ID in the BFS for the stream graph) > * chained output nodes > * input nodes hashes > *Problem* > The chaining behavior will affect state compatibility, as the generation of > the OperatorID of an Op is dependent on its chained output nodes. For > example, a simple source->sink DAG with source and sink chained together is > state imcompatible with an otherwise identical DAG with source and sink > unchained (either because the parallelisms of the two ops are changed to be > unequal or chaining is disabled). This greatly limits the flexibility to > perform chain-breaking/joining for performance tuning. > *Proposal* > Introduce {{StreamGraphHasherV3}} that is agnostic to the chaining behavior > of operators, which effectively just removes L227-235 of > [flink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java > at master · apache/flink > (github.com)|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java]. > > This will not hurt the deteministicity of the ID generation across job > submission as long as the stream graph topology doesn't change, and since new > versions of Flink have already adopted pure operator-level state recovery, > this will not break state recovery across job submission as long as both > submissions use the same hasher. > This will, however, break cross-version state compatibility. So we can > introduce a new option to enable using HasherV3 in v1.19 and consider making > it the default hasher in v2.0. > Looking forward to suggestions on this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33967) Remove/Rename log4j2-test.properties in flink-streaming-java's test bundle
Koala Lam created FLINK-33967: - Summary: Remove/Rename log4j2-test.properties in flink-streaming-java's test bundle Key: FLINK-33967 URL: https://issues.apache.org/jira/browse/FLINK-33967 Project: Flink Issue Type: Improvement Reporter: Koala Lam This file from test classpath is picked automatically by Log4j2. In order to reliably use our own log4j2 test config, we have to specify system property "log4j2.configurationFile" which is not ideal as we have to manually set it in IDE config. https://logging.apache.org/log4j/2.x/manual/configuration.html -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33966) Fix the getNumRecordsInPerSecond Utility Function
[ https://issues.apache.org/jira/browse/FLINK-33966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33966: --- Labels: pull-request-available (was: ) > Fix the getNumRecordsInPerSecond Utility Function > - > > Key: FLINK-33966 > URL: https://issues.apache.org/jira/browse/FLINK-33966 > Project: Flink > Issue Type: Bug > Components: Autoscaler >Affects Versions: kubernetes-operator-1.7.0 >Reporter: Yang LI >Priority: Minor > Labels: pull-request-available > > We have following code in the codebase > {code:java} > if (isSource && (numRecordsInPerSecond == null || > numRecordsInPerSecond.getSum() == 0)) { > numRecordsInPerSecond = > > flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC); > }{code} > {code:java} > if (isSource && (numRecordsInPerSecond == null || > numRecordsInPerSecond.getSum() == 0)) { > numRecordsInPerSecond = > > flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC); > }{code} > with two times the same condition check > > {*}Definition of done{*}: > Update getNumRecordsInPerSecond'{{{}s{}}} second {{if}} condition from {{if > (isSource && ...)}} to {{{}if (!isSource && ...){}}}. This addresses the > redundant check and ensures correct metric fetching for non-source operators. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-33966] Fix the getNumRecordsInPerSecond Utility Function [flink-kubernetes-operator]
Yang-LI-CS opened a new pull request, #743: URL: https://github.com/apache/flink-kubernetes-operator/pull/743 ## What is the purpose of the change Fix the getNumRecordsInPerSecond Utility Function to addresses the redundant check and ensures correct metric fetching for non-source operators. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changes to the `CustomResourceDescriptors`: no - Core observer or reconciler logic that is regularly executed: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33966) Fix the getNumRecordsInPerSecond Utility Function
[ https://issues.apache.org/jira/browse/FLINK-33966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yang LI updated FLINK-33966: Description: We have following code in the codebase {code:java} if (isSource && (numRecordsInPerSecond == null || numRecordsInPerSecond.getSum() == 0)) { numRecordsInPerSecond = flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC); }{code} {code:java} if (isSource && (numRecordsInPerSecond == null || numRecordsInPerSecond.getSum() == 0)) { numRecordsInPerSecond = flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC); }{code} with two times the same condition check {*}Definition of done{*}: Update getNumRecordsInPerSecond'{{{}s{}}} second {{if}} condition from {{if (isSource && ...)}} to {{{}if (!isSource && ...){}}}. This addresses the redundant check and ensures correct metric fetching for non-source operators. was: We have 2 times the same condition check in the function getNumRecordsInPerSecond ([L220|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java#L220] and [L224|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java#L224]) {*}Definition of done{*}: Update getNumRecordsInPerSecond'{{{}s{}}} second {{if}} condition from {{if (isSource && ...)}} to {{{}if (!isSource && ...){}}}. This addresses the redundant check and ensures correct metric fetching for non-source operators. > Fix the getNumRecordsInPerSecond Utility Function > - > > Key: FLINK-33966 > URL: https://issues.apache.org/jira/browse/FLINK-33966 > Project: Flink > Issue Type: Bug > Components: Autoscaler >Affects Versions: kubernetes-operator-1.7.0 >Reporter: Yang LI >Priority: Minor > > We have following code in the codebase > {code:java} > if (isSource && (numRecordsInPerSecond == null || > numRecordsInPerSecond.getSum() == 0)) { > numRecordsInPerSecond = > > flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC); > }{code} > {code:java} > if (isSource && (numRecordsInPerSecond == null || > numRecordsInPerSecond.getSum() == 0)) { > numRecordsInPerSecond = > > flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC); > }{code} > with two times the same condition check > > {*}Definition of done{*}: > Update getNumRecordsInPerSecond'{{{}s{}}} second {{if}} condition from {{if > (isSource && ...)}} to {{{}if (!isSource && ...){}}}. This addresses the > redundant check and ensures correct metric fetching for non-source operators. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33966) Fix the getNumRecordsInPerSecond Utility Function
[ https://issues.apache.org/jira/browse/FLINK-33966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yang LI updated FLINK-33966: Description: We have 2 times the same condition check in the function getNumRecordsInPerSecond ([L220|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java#L220] and [L224|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java#L224]) {*}Definition of done{*}: Update getNumRecordsInPerSecond'{{{}s{}}} second {{if}} condition from {{if (isSource && ...)}} to {{{}if (!isSource && ...){}}}. This addresses the redundant check and ensures correct metric fetching for non-source operators. was: We have 2 times the same condition check in the function getNumRecordsInPerSecond ([L220|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java#L220] and [L224|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java#L224]) Definition of done: Update getNumRecordsInPerSecond'{{{}s{}}} second {{if}} condition from {{if (isSource && ...)}} to {{{}if (!isSource && ...){}}}. This addresses the redundant check and ensures correct metric fetching for non-source operators. > Fix the getNumRecordsInPerSecond Utility Function > - > > Key: FLINK-33966 > URL: https://issues.apache.org/jira/browse/FLINK-33966 > Project: Flink > Issue Type: Bug > Components: Autoscaler >Affects Versions: kubernetes-operator-1.7.0 >Reporter: Yang LI >Priority: Minor > > We have 2 times the same condition check in the function > getNumRecordsInPerSecond > ([L220|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java#L220] > and > [L224|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java#L224]) > > {*}Definition of done{*}: > Update getNumRecordsInPerSecond'{{{}s{}}} second {{if}} condition from {{if > (isSource && ...)}} to {{{}if (!isSource && ...){}}}. This addresses the > redundant check and ensures correct metric fetching for non-source operators. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33966) Fix the getNumRecordsInPerSecond Utility Function
[ https://issues.apache.org/jira/browse/FLINK-33966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yang LI updated FLINK-33966: Description: We have 2 times the same condition check in the function getNumRecordsInPerSecond ([L220|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java#L220] and [L224|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java#L224]) Definition of done: Update getNumRecordsInPerSecond'{{{}s{}}} second {{if}} condition from {{if (isSource && ...)}} to {{{}if (!isSource && ...){}}}. This addresses the redundant check and ensures correct metric fetching for non-source operators. was: We have 2 times the same condition check in the function getNumRecordsInPerSecond ([L220|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java#L220] and [L224|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java#L224]) Action: Update getNumRecordsInPerSecond'{{{}s{}}} second {{if}} condition from {{if (isSource && ...)}} to {{{}if (!isSource && ...){}}}. This addresses the redundant check and ensures correct metric fetching for non-source operators. > Fix the getNumRecordsInPerSecond Utility Function > - > > Key: FLINK-33966 > URL: https://issues.apache.org/jira/browse/FLINK-33966 > Project: Flink > Issue Type: Bug > Components: Autoscaler >Affects Versions: kubernetes-operator-1.7.0 >Reporter: Yang LI >Priority: Minor > > We have 2 times the same condition check in the function > getNumRecordsInPerSecond > ([L220|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java#L220] > and > [L224|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java#L224]) > Definition of done: > Update getNumRecordsInPerSecond'{{{}s{}}} second {{if}} condition from {{if > (isSource && ...)}} to {{{}if (!isSource && ...){}}}. This addresses the > redundant check and ensures correct metric fetching for non-source operators. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33966) Fix the getNumRecordsInPerSecond Utility Function
Yang LI created FLINK-33966: --- Summary: Fix the getNumRecordsInPerSecond Utility Function Key: FLINK-33966 URL: https://issues.apache.org/jira/browse/FLINK-33966 Project: Flink Issue Type: Bug Components: Autoscaler Affects Versions: kubernetes-operator-1.7.0 Reporter: Yang LI We have 2 times the same condition check in the function getNumRecordsInPerSecond ([L220|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java#L220] and [L224|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java#L224]) Action: Update getNumRecordsInPerSecond'{{{}s{}}} second {{if}} condition from {{if (isSource && ...)}} to {{{}if (!isSource && ...){}}}. This addresses the redundant check and ensures correct metric fetching for non-source operators. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1438684576 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java: ## @@ -96,28 +97,115 @@ public JdbcDynamicTableSource( public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { // JDBC only support non-nested look up keys String[] keyNames = new String[context.getKeys().length]; + for (int i = 0; i < keyNames.length; i++) { int[] innerKeyArr = context.getKeys()[i]; Preconditions.checkArgument( innerKeyArr.length == 1, "JDBC only support non-nested look up keys"); keyNames[i] = DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]); } + final RowType rowType = (RowType) physicalRowDataType.getLogicalType(); + +String[] conditions = null; + +if (this.resolvedPredicates != null) { +conditions = new String[this.resolvedPredicates.size()]; +int processedPushdownParamsIndex = 0; +for (int i = 0; i < this.resolvedPredicates.size(); i++) { +String resolvedPredicate = this.resolvedPredicates.get(i); + +/* + * This replace seems like it should be using a Flink class to resolve the parameter. It does not + * effect the dialects as the placeholder comes from JdbcFilterPushdownPreparedStatementVisitor. + * + * Here is what has been considered as alternatives. + * + * We cannot use the way this is done in getScanRuntimeProvider, as the index we have is the index + * into the filters, but it needs the index into the fields. For example one lookup key and one filter + * would both have an index of 0, which the subsequent code would incorrectly resolve to the first + * field. + * We cannot use the PreparedStatement as we have not got access to the statement here. + * We cannot use ParameterizedPredicate as it takes the filter expression as input (e.g EQUALS(...) + * not the form we have here an example would be ('field1'= ?). + * + * An entry in the resolvedPredicates list may have more than one associated pushdown parameter, for example + * a query like this : ... on e.type = 2 and (e.age = 50 OR height > 90) and a.ip = e.ip; + * will have 2 resolvedPredicates and 3 pushdownParams. The 2nd and 3rd pushdownParams will be for the second + * resolvedPredicate. + * + */ +ArrayList paramsForThisPredicate = new ArrayList(); +char placeholderChar = + JdbcFilterPushdownPreparedStatementVisitor.PUSHDOWN_PREDICATE_PLACEHOLDER +.charAt(0); + +int count = +(int) resolvedPredicate.chars().filter(ch -> ch == placeholderChar).count(); + +for (int j = processedPushdownParamsIndex; +j < processedPushdownParamsIndex + count; +j++) { + paramsForThisPredicate.add(this.pushdownParams[j].toString()); +} +processedPushdownParamsIndex = processedPushdownParamsIndex + count; Review Comment: @snuyanzin Yes I had not thought of that. I have reviewed this. I see 3 options I am looking to explore: 1. amend the string processing to cope with the scenario you have highlighted. This is tricky as I will need to parse the string looking for simple binary operators and OR operators that link many simple expressions, when there might be quote in the column names. This seems overly complicated and fragile. 2. Investigate whether I can change how `this.pushdownParams` and `this.resolvedPredicates` are populated or add more instance variables - so they can be simply processed without complex string logic. 3. remove the pushdown capability for lookup joins from this connector to enable the release. I think this is looking more attractive as the pragmatic approach, given what we are seeing. I hope 2. will be fruitful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33940) Update the auto-derivation rule of max parallelism for enlarged upscaling space
[ https://issues.apache.org/jira/browse/FLINK-33940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801843#comment-17801843 ] Maximilian Michels commented on FLINK-33940: [~Zhanghao Chen] Even though the factor only affects high parallelism operators > 840, I wonder whether we need to leave more room for scaleup. But I don't have a strong opinion. {quote} IIUC, when the parallelism of one job is very small(it's 1 or 2) and the max parallelism is 1024, one subtask will have 1024 keyGroups. From state backend side, too many key groups may effect the performance. (This is my concern to change it by default in Flink Community.) {quote} [~fanrui] I think we need to find out how big the performance impact actually is when jumping from 128 to 840 key groups. But 128 may just have been a very conservative number. > Update the auto-derivation rule of max parallelism for enlarged upscaling > space > --- > > Key: FLINK-33940 > URL: https://issues.apache.org/jira/browse/FLINK-33940 > Project: Flink > Issue Type: Improvement > Components: API / Core >Reporter: Zhanghao Chen >Assignee: Zhanghao Chen >Priority: Major > > *Background* > The choice of the max parallelism of an stateful operator is important as it > limits the upper bound of the parallelism of the opeartor while it can also > add extra overhead when being set too large. Currently, the max parallelism > of an opeartor is either fixed to a value specified by API core / pipeline > option or auto-derived with the following rules: > {{min(max(roundUpToPowerOfTwo(operatorParallelism * 1.5), 128), 32767)}} > *Problem* > Recently, the elasticity of Flink jobs is becoming more and more valued by > users. The current auto-derived max parallelism was introduced a time time > ago and only allows the operator parallelism to be roughly doubled, which is > not desired for elasticity. Setting an max parallelism manually may not be > desired as well: users may not have the sufficient expertise to select a good > max-parallelism value. > *Proposal* > Update the auto-derivation rule of max parallelism to derive larger max > parallelism for better elasticity experience out of the box. A candidate is > as follows: > {{min(max(roundUpToPowerOfTwo(operatorParallelism * {*}5{*}), {*}1024{*}), > 32767)}} > Looking forward to your opinions on this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-20454][format][debezium] Allow to read metadata for debezium-avro-confluent format [flink]
creativedutchmen commented on PR #18744: URL: https://github.com/apache/flink/pull/18744#issuecomment-1874226714 This would be super helpful to have merged. Are there any plans on pushing this forward? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1438684576 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java: ## @@ -96,28 +97,115 @@ public JdbcDynamicTableSource( public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { // JDBC only support non-nested look up keys String[] keyNames = new String[context.getKeys().length]; + for (int i = 0; i < keyNames.length; i++) { int[] innerKeyArr = context.getKeys()[i]; Preconditions.checkArgument( innerKeyArr.length == 1, "JDBC only support non-nested look up keys"); keyNames[i] = DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]); } + final RowType rowType = (RowType) physicalRowDataType.getLogicalType(); + +String[] conditions = null; + +if (this.resolvedPredicates != null) { +conditions = new String[this.resolvedPredicates.size()]; +int processedPushdownParamsIndex = 0; +for (int i = 0; i < this.resolvedPredicates.size(); i++) { +String resolvedPredicate = this.resolvedPredicates.get(i); + +/* + * This replace seems like it should be using a Flink class to resolve the parameter. It does not + * effect the dialects as the placeholder comes from JdbcFilterPushdownPreparedStatementVisitor. + * + * Here is what has been considered as alternatives. + * + * We cannot use the way this is done in getScanRuntimeProvider, as the index we have is the index + * into the filters, but it needs the index into the fields. For example one lookup key and one filter + * would both have an index of 0, which the subsequent code would incorrectly resolve to the first + * field. + * We cannot use the PreparedStatement as we have not got access to the statement here. + * We cannot use ParameterizedPredicate as it takes the filter expression as input (e.g EQUALS(...) + * not the form we have here an example would be ('field1'= ?). + * + * An entry in the resolvedPredicates list may have more than one associated pushdown parameter, for example + * a query like this : ... on e.type = 2 and (e.age = 50 OR height > 90) and a.ip = e.ip; + * will have 2 resolvedPredicates and 3 pushdownParams. The 2nd and 3rd pushdownParams will be for the second + * resolvedPredicate. + * + */ +ArrayList paramsForThisPredicate = new ArrayList(); +char placeholderChar = + JdbcFilterPushdownPreparedStatementVisitor.PUSHDOWN_PREDICATE_PLACEHOLDER +.charAt(0); + +int count = +(int) resolvedPredicate.chars().filter(ch -> ch == placeholderChar).count(); + +for (int j = processedPushdownParamsIndex; +j < processedPushdownParamsIndex + count; +j++) { + paramsForThisPredicate.add(this.pushdownParams[j].toString()); +} +processedPushdownParamsIndex = processedPushdownParamsIndex + count; Review Comment: @snuyanzin Yes I had not thought of that. I have reviewed this. I see 2 options I am looking to explore: 1. amend the string processing to cope with the scenario you have highlighted. This is tricky as I will need to parse the string looking for simple binary operators and OR operators that link many simple expressions, when there might be quote in the column names. This seems overly complicated and fragile. 2. Investigate whether I can change how `this.pushdownParams` and `this.resolvedPredicates` are populated or add more instance variables - so they can be simply processed without complex string logic. 3. remove the pushdown capability for lookup joins from this connector to enable the release. I think this is looking more attractive as the pragmatic approach, given what we are seeing. I hope 2. will be fruitful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1438684576 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java: ## @@ -96,28 +97,115 @@ public JdbcDynamicTableSource( public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { // JDBC only support non-nested look up keys String[] keyNames = new String[context.getKeys().length]; + for (int i = 0; i < keyNames.length; i++) { int[] innerKeyArr = context.getKeys()[i]; Preconditions.checkArgument( innerKeyArr.length == 1, "JDBC only support non-nested look up keys"); keyNames[i] = DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]); } + final RowType rowType = (RowType) physicalRowDataType.getLogicalType(); + +String[] conditions = null; + +if (this.resolvedPredicates != null) { +conditions = new String[this.resolvedPredicates.size()]; +int processedPushdownParamsIndex = 0; +for (int i = 0; i < this.resolvedPredicates.size(); i++) { +String resolvedPredicate = this.resolvedPredicates.get(i); + +/* + * This replace seems like it should be using a Flink class to resolve the parameter. It does not + * effect the dialects as the placeholder comes from JdbcFilterPushdownPreparedStatementVisitor. + * + * Here is what has been considered as alternatives. + * + * We cannot use the way this is done in getScanRuntimeProvider, as the index we have is the index + * into the filters, but it needs the index into the fields. For example one lookup key and one filter + * would both have an index of 0, which the subsequent code would incorrectly resolve to the first + * field. + * We cannot use the PreparedStatement as we have not got access to the statement here. + * We cannot use ParameterizedPredicate as it takes the filter expression as input (e.g EQUALS(...) + * not the form we have here an example would be ('field1'= ?). + * + * An entry in the resolvedPredicates list may have more than one associated pushdown parameter, for example + * a query like this : ... on e.type = 2 and (e.age = 50 OR height > 90) and a.ip = e.ip; + * will have 2 resolvedPredicates and 3 pushdownParams. The 2nd and 3rd pushdownParams will be for the second + * resolvedPredicate. + * + */ +ArrayList paramsForThisPredicate = new ArrayList(); +char placeholderChar = + JdbcFilterPushdownPreparedStatementVisitor.PUSHDOWN_PREDICATE_PLACEHOLDER +.charAt(0); + +int count = +(int) resolvedPredicate.chars().filter(ch -> ch == placeholderChar).count(); + +for (int j = processedPushdownParamsIndex; +j < processedPushdownParamsIndex + count; +j++) { + paramsForThisPredicate.add(this.pushdownParams[j].toString()); +} +processedPushdownParamsIndex = processedPushdownParamsIndex + count; Review Comment: @snuyanzin Yes I had not thought of that. I have reviewed this. I see 2 options I am looking to explore: 1. amend the string processing to cope with the scenario you have highlighted. This is tricky as I will need to parse the string looking for simple binary operators and OR operators that link many simple expressions, when there might be quote in the column names. This seems overly complicated and fragile. 2. Investigate whether I can change how `this.pushdownParams` and `this.resolvedPredicates` are populated or add more instance variables - so they can be simply processed without complex string logic. 3. remove the pushdown capability from this connector to enable the release. I think this is looking more attractive as the pragmatic approach, given what we are seeing. I hope 2. will be fruitful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33694][gs-fs-hadoop] Support overriding GCS root URL [flink]
patricklucas commented on PR #23836: URL: https://github.com/apache/flink/pull/23836#issuecomment-1874157629 I do not think there is much room for unexpected behavior here. The surface area of the change is quite small and follows the existing model for translating a Hadoop config option specific to this filesystem implementation into its native SDK equivalent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33940) Update the auto-derivation rule of max parallelism for enlarged upscaling space
[ https://issues.apache.org/jira/browse/FLINK-33940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801815#comment-17801815 ] Zhanghao Chen commented on FLINK-33940: --- [~mxm] There's no concrete rationale for selecting the factor 5, it's just a magic number that most users are satified with in our production env. I'd be good with 10 as well. Do you have any perference on the selection of the factor? > Update the auto-derivation rule of max parallelism for enlarged upscaling > space > --- > > Key: FLINK-33940 > URL: https://issues.apache.org/jira/browse/FLINK-33940 > Project: Flink > Issue Type: Improvement > Components: API / Core >Reporter: Zhanghao Chen >Assignee: Zhanghao Chen >Priority: Major > > *Background* > The choice of the max parallelism of an stateful operator is important as it > limits the upper bound of the parallelism of the opeartor while it can also > add extra overhead when being set too large. Currently, the max parallelism > of an opeartor is either fixed to a value specified by API core / pipeline > option or auto-derived with the following rules: > {{min(max(roundUpToPowerOfTwo(operatorParallelism * 1.5), 128), 32767)}} > *Problem* > Recently, the elasticity of Flink jobs is becoming more and more valued by > users. The current auto-derived max parallelism was introduced a time time > ago and only allows the operator parallelism to be roughly doubled, which is > not desired for elasticity. Setting an max parallelism manually may not be > desired as well: users may not have the sufficient expertise to select a good > max-parallelism value. > *Proposal* > Update the auto-derivation rule of max parallelism to derive larger max > parallelism for better elasticity experience out of the box. A candidate is > as follows: > {{min(max(roundUpToPowerOfTwo(operatorParallelism * {*}5{*}), {*}1024{*}), > 32767)}} > Looking forward to your opinions on this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [hotfix] Update copyright NOTICE year to 2024 [flink]
flinkbot commented on PR #24018: URL: https://github.com/apache/flink/pull/24018#issuecomment-1874115436 ## CI report: * 42c6079b521c50cc7e0e8017c29ce70f634688d3 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix] Update copyright NOTICE year to 2024 [flink]
caicancai commented on PR #24018: URL: https://github.com/apache/flink/pull/24018#issuecomment-1874107814 cc @1996fanrui -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [hotfix] Update copyright NOTICE year to 2024 [flink]
caicancai opened a new pull request, #24018: URL: https://github.com/apache/flink/pull/24018 ## What is the purpose of the change Update copyright NOTICE year to 2024 ## Brief change log Update copyright NOTICE year to 2024 ## Verifying this change Update copyright NOTICE year to 2024 ## Does this pull request potentially affect one of the following parts: no ## Documentation Does this pull request introduce a new feature? (no) If yes, how is the feature documented? (not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-33433) Support invoke async-profiler on Jobmanager through REST API
[ https://issues.apache.org/jira/browse/FLINK-33433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang resolved FLINK-33433. -- Fix Version/s: 1.19.0 Resolution: Fixed merged in master: 240494fd6169cb98b47808a003ee00804a780360...3efe9d2b09bedde89322594f0f3927004b6b1adf > Support invoke async-profiler on Jobmanager through REST API > > > Key: FLINK-33433 > URL: https://issues.apache.org/jira/browse/FLINK-33433 > Project: Flink > Issue Type: Sub-task > Components: Runtime / REST >Affects Versions: 1.19.0 >Reporter: Yu Chen >Assignee: Yu Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33940) Update the auto-derivation rule of max parallelism for enlarged upscaling space
[ https://issues.apache.org/jira/browse/FLINK-33940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801790#comment-17801790 ] Maximilian Michels commented on FLINK-33940: [~Zhanghao Chen] Thank you for the proposal. I agree with using highly composite numbers, as this will provide more flexibility to the autoscaler. I'm not sure about the {{operatorParallelism * 5}}. What is the rational for selecting this factor? Why not {{*10}}? > Update the auto-derivation rule of max parallelism for enlarged upscaling > space > --- > > Key: FLINK-33940 > URL: https://issues.apache.org/jira/browse/FLINK-33940 > Project: Flink > Issue Type: Improvement > Components: API / Core >Reporter: Zhanghao Chen >Assignee: Zhanghao Chen >Priority: Major > > *Background* > The choice of the max parallelism of an stateful operator is important as it > limits the upper bound of the parallelism of the opeartor while it can also > add extra overhead when being set too large. Currently, the max parallelism > of an opeartor is either fixed to a value specified by API core / pipeline > option or auto-derived with the following rules: > {{min(max(roundUpToPowerOfTwo(operatorParallelism * 1.5), 128), 32767)}} > *Problem* > Recently, the elasticity of Flink jobs is becoming more and more valued by > users. The current auto-derived max parallelism was introduced a time time > ago and only allows the operator parallelism to be roughly doubled, which is > not desired for elasticity. Setting an max parallelism manually may not be > desired as well: users may not have the sufficient expertise to select a good > max-parallelism value. > *Proposal* > Update the auto-derivation rule of max parallelism to derive larger max > parallelism for better elasticity experience out of the box. A candidate is > as follows: > {{min(max(roundUpToPowerOfTwo(operatorParallelism * {*}5{*}), {*}1024{*}), > 32767)}} > Looking forward to your opinions on this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33433][rest] Introduce async-profiler to support profiling Job… [flink]
Myasuka closed pull request #23820: [FLINK-33433][rest] Introduce async-profiler to support profiling Job… URL: https://github.com/apache/flink/pull/23820 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1438684576 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java: ## @@ -96,28 +97,115 @@ public JdbcDynamicTableSource( public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { // JDBC only support non-nested look up keys String[] keyNames = new String[context.getKeys().length]; + for (int i = 0; i < keyNames.length; i++) { int[] innerKeyArr = context.getKeys()[i]; Preconditions.checkArgument( innerKeyArr.length == 1, "JDBC only support non-nested look up keys"); keyNames[i] = DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]); } + final RowType rowType = (RowType) physicalRowDataType.getLogicalType(); + +String[] conditions = null; + +if (this.resolvedPredicates != null) { +conditions = new String[this.resolvedPredicates.size()]; +int processedPushdownParamsIndex = 0; +for (int i = 0; i < this.resolvedPredicates.size(); i++) { +String resolvedPredicate = this.resolvedPredicates.get(i); + +/* + * This replace seems like it should be using a Flink class to resolve the parameter. It does not + * effect the dialects as the placeholder comes from JdbcFilterPushdownPreparedStatementVisitor. + * + * Here is what has been considered as alternatives. + * + * We cannot use the way this is done in getScanRuntimeProvider, as the index we have is the index + * into the filters, but it needs the index into the fields. For example one lookup key and one filter + * would both have an index of 0, which the subsequent code would incorrectly resolve to the first + * field. + * We cannot use the PreparedStatement as we have not got access to the statement here. + * We cannot use ParameterizedPredicate as it takes the filter expression as input (e.g EQUALS(...) + * not the form we have here an example would be ('field1'= ?). + * + * An entry in the resolvedPredicates list may have more than one associated pushdown parameter, for example + * a query like this : ... on e.type = 2 and (e.age = 50 OR height > 90) and a.ip = e.ip; + * will have 2 resolvedPredicates and 3 pushdownParams. The 2nd and 3rd pushdownParams will be for the second + * resolvedPredicate. + * + */ +ArrayList paramsForThisPredicate = new ArrayList(); +char placeholderChar = + JdbcFilterPushdownPreparedStatementVisitor.PUSHDOWN_PREDICATE_PLACEHOLDER +.charAt(0); + +int count = +(int) resolvedPredicate.chars().filter(ch -> ch == placeholderChar).count(); + +for (int j = processedPushdownParamsIndex; +j < processedPushdownParamsIndex + count; +j++) { + paramsForThisPredicate.add(this.pushdownParams[j].toString()); +} +processedPushdownParamsIndex = processedPushdownParamsIndex + count; Review Comment: @snuyanzin Yes I had not thought of that. I have reviewed this. I see 2 options I am looking to explore: 1. amend the string processing to cope with the scenario you have highlighted. This is tricky as I will need to parse the string looking for simple binary operators and OR operators that link many simple expressions, when there might be quote in the column names. This seems overly complicated and fragile. 2. Investigate whether I can change how `this.pushdownParams` and `this.resolvedPredicates` are populated or add more instance variables - so they can be simply processed without complex string logic. I hope 2. will be fruitful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33611) Support Large Protobuf Schemas
[ https://issues.apache.org/jira/browse/FLINK-33611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801783#comment-17801783 ] Benchao Li commented on FLINK-33611: {quote}the main proposal here is to now reuse the variable names across different split method scopes. This will greatly reduce the constant pool size. {quote} [~dsaisharath] I'm wondering where the "constants" come from, IIUC, local variables in method should not affect class's constant pool? > Support Large Protobuf Schemas > -- > > Key: FLINK-33611 > URL: https://issues.apache.org/jira/browse/FLINK-33611 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.18.0 >Reporter: Sai Sharath Dandi >Assignee: Sai Sharath Dandi >Priority: Major > Labels: pull-request-available > > h3. Background > Flink serializes and deserializes protobuf format data by calling the decode > or encode method in GeneratedProtoToRow_XXX.java generated by codegen to > parse byte[] data into Protobuf Java objects. FLINK-32650 has introduced the > ability to split the generated code to improve the performance for large > Protobuf schemas. However, this is still not sufficient to support some > larger protobuf schemas as the generated code exceeds the java constant pool > size [limit|https://en.wikipedia.org/wiki/Java_class_file#The_constant_pool] > and we can see errors like "Too many constants" when trying to compile the > generated code. > *Solution* > Since we already have the split code functionality already introduced, the > main proposal here is to now reuse the variable names across different split > method scopes. This will greatly reduce the constant pool size. One more > optimization is to only split the last code segment also only when the size > exceeds split threshold limit. Currently, the last segment of the generated > code is always being split which can lead to too many split methods and thus > exceed the constant pool size limit -- This message was sent by Atlassian Jira (v8.20.10#820010)