[jira] [Commented] (FLINK-33971) Specifies whether to use HBase table that supports dynamic columns.

2024-01-02 Thread MOBIN (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17802045#comment-17802045
 ] 

MOBIN commented on FLINK-33971:
---

https://github.com/apache/flink-connector-hbase/pull/36

> Specifies whether to use HBase table that supports dynamic columns.
> ---
>
> Key: FLINK-33971
> URL: https://issues.apache.org/jira/browse/FLINK-33971
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / HBase
>Reporter: MOBIN
>Priority: Minor
>
> Specifies whether to use HBase table that supports dynamic columns.
> Refer to the dynamic.table parameter in this document: 
> [[https://www.alibabacloud.com/help/en/flink/developer-reference/apsaradb-for-hbase-connector#section-ltp-3fy-9qv|http://example.com]|https://www.alibabacloud.com/help/en/flink/developer-reference/apsaradb-for-hbase-connector#section-ltp-3fy-9qv]
> Sample code for a result table that supports dynamic columns
> CREATE TEMPORARY TABLE datagen_source (
>   id INT,
>   f1hour STRING,
>   f1deal BIGINT,
>   f2day STRING,
>   f2deal BIGINT
> ) WITH (
>   'connector'='datagen'
> );
> CREATE TEMPORARY TABLE hbase_sink (
>   rowkey INT,
>   f1 ROW<`hour` STRING, deal BIGINT>,
>   f2 ROW<`day` STRING, deal BIGINT>
> ) WITH (
>   'connector'='hbase-2.2',
>   'table-name'='',
>   'zookeeper.quorum'='',
>   'dynamic.table'='true'
> );
> INSERT INTO hbase_sink
> SELECT id, ROW(f1hour, f1deal), ROW(f2day, f2deal) FROM datagen_source;
> If dynamic.table is set to true, HBase table that supports dynamic columns is 
> used.
> Two fields must be declared in the rows that correspond to each column 
> family. The value of the first field indicates the dynamic column, and the 
> value of the second field indicates the value of the dynamic column.
> For example, the datagen_source table contains a row of data The row of data 
> indicates that the ID of the commodity is 1, the transaction amount of the 
> commodity between 10:00 and 11:00 is 100, and the transaction amount of the 
> commodity on July 26, 2020 is 1. In this case, a row whose rowkey is 1 is 
> inserted into the ApsaraDB for HBase table. f1:10 is 100, and f2:2020-7-26 is 
> 1.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-33971) Specifies whether to use HBase table that supports dynamic columns.

2024-01-02 Thread MOBIN (Jira)


[ https://issues.apache.org/jira/browse/FLINK-33971 ]


MOBIN deleted comment on FLINK-33971:
---

was (Author: mobin):
https://github.com/apache/flink-connector-hbase/pull/36

> Specifies whether to use HBase table that supports dynamic columns.
> ---
>
> Key: FLINK-33971
> URL: https://issues.apache.org/jira/browse/FLINK-33971
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / HBase
>Reporter: MOBIN
>Priority: Minor
>
> Specifies whether to use HBase table that supports dynamic columns.
> Refer to the dynamic.table parameter in this document: 
> [[https://www.alibabacloud.com/help/en/flink/developer-reference/apsaradb-for-hbase-connector#section-ltp-3fy-9qv|http://example.com]|https://www.alibabacloud.com/help/en/flink/developer-reference/apsaradb-for-hbase-connector#section-ltp-3fy-9qv]
> Sample code for a result table that supports dynamic columns
> CREATE TEMPORARY TABLE datagen_source (
>   id INT,
>   f1hour STRING,
>   f1deal BIGINT,
>   f2day STRING,
>   f2deal BIGINT
> ) WITH (
>   'connector'='datagen'
> );
> CREATE TEMPORARY TABLE hbase_sink (
>   rowkey INT,
>   f1 ROW<`hour` STRING, deal BIGINT>,
>   f2 ROW<`day` STRING, deal BIGINT>
> ) WITH (
>   'connector'='hbase-2.2',
>   'table-name'='',
>   'zookeeper.quorum'='',
>   'dynamic.table'='true'
> );
> INSERT INTO hbase_sink
> SELECT id, ROW(f1hour, f1deal), ROW(f2day, f2deal) FROM datagen_source;
> If dynamic.table is set to true, HBase table that supports dynamic columns is 
> used.
> Two fields must be declared in the rows that correspond to each column 
> family. The value of the first field indicates the dynamic column, and the 
> value of the second field indicates the value of the dynamic column.
> For example, the datagen_source table contains a row of data The row of data 
> indicates that the ID of the commodity is 1, the transaction amount of the 
> commodity between 10:00 and 11:00 is 100, and the transaction amount of the 
> commodity on July 26, 2020 is 1. In this case, a row whose rowkey is 1 is 
> inserted into the ApsaraDB for HBase table. f1:10 is 100, and f2:2020-7-26 is 
> 1.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33971) Specifies whether to use HBase table that supports dynamic columns.

2024-01-02 Thread MOBIN (Jira)
MOBIN created FLINK-33971:
-

 Summary: Specifies whether to use HBase table that supports 
dynamic columns.
 Key: FLINK-33971
 URL: https://issues.apache.org/jira/browse/FLINK-33971
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / HBase
Reporter: MOBIN


Specifies whether to use HBase table that supports dynamic columns.

Refer to the dynamic.table parameter in this document: 
[[https://www.alibabacloud.com/help/en/flink/developer-reference/apsaradb-for-hbase-connector#section-ltp-3fy-9qv|http://example.com]|https://www.alibabacloud.com/help/en/flink/developer-reference/apsaradb-for-hbase-connector#section-ltp-3fy-9qv]

Sample code for a result table that supports dynamic columns

CREATE TEMPORARY TABLE datagen_source (
  id INT,
  f1hour STRING,
  f1deal BIGINT,
  f2day STRING,
  f2deal BIGINT
) WITH (
  'connector'='datagen'
);

CREATE TEMPORARY TABLE hbase_sink (
  rowkey INT,
  f1 ROW<`hour` STRING, deal BIGINT>,
  f2 ROW<`day` STRING, deal BIGINT>
) WITH (
  'connector'='hbase-2.2',
  'table-name'='',
  'zookeeper.quorum'='',
  'dynamic.table'='true'
);

INSERT INTO hbase_sink
SELECT id, ROW(f1hour, f1deal), ROW(f2day, f2deal) FROM datagen_source;
If dynamic.table is set to true, HBase table that supports dynamic columns is 
used.
Two fields must be declared in the rows that correspond to each column family. 
The value of the first field indicates the dynamic column, and the value of the 
second field indicates the value of the dynamic column.

For example, the datagen_source table contains a row of data The row of data 
indicates that the ID of the commodity is 1, the transaction amount of the 
commodity between 10:00 and 11:00 is 100, and the transaction amount of the 
commodity on July 26, 2020 is 1. In this case, a row whose rowkey is 1 is 
inserted into the ApsaraDB for HBase table. f1:10 is 100, and f2:2020-7-26 is 
1.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-26694) LookupFunction doesn't support multilevel inheritance

2024-01-02 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu resolved FLINK-26694.

Resolution: Fixed

Fixed in master(1.19): 6e78eb18524ead3abd60da0ca41751b45e0e2482

> LookupFunction doesn't support multilevel inheritance
> -
>
> Key: FLINK-26694
> URL: https://issues.apache.org/jira/browse/FLINK-26694
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.14.4
>Reporter: tsianglei
>Assignee: Xianxun Ye
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
> Fix For: 1.19.0
>
> Attachments: image-2022-03-17-11-19-21-269.png
>
>
> When we implement lookup function with multilevel inheritance,then there has 
> an error。
> {code:java}
> org.apache.flink.table.api.ValidationException: Could not determine a type 
> inference for lookup function 'default_catalog.default_database.dimTable'. 
> Lookup functions support regular type inference. However, for convenience, 
> the output class can simply be a Row or RowData class in which case the input 
> and output types are derived from the table's schema with default conversion. 
>    at 
> org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator$.createLookupTypeInference(LookupJoinCodeGenerator.scala:270)
>     at 
> org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator$.generateLookupFunction(LookupJoinCodeGenerator.scala:166)
>     at 
> org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator$.generateAsyncLookupFunction(LookupJoinCodeGenerator.scala:118)
>     at 
> org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator.generateAsyncLookupFunction(LookupJoinCodeGenerator.scala)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.createAsyncLookupJoin(CommonExecLookupJoin.java:332)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.translateToPlanInternal(CommonExecLookupJoin.java:238)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:88)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:114)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:71)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:70)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:107)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:47)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:705)
>     at 
> org.apache.flink.table.api.internal.StatementSetImpl.explain(StatementSetImpl.java:118)
>     XXX
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>     at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>     at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:4

[jira] [Assigned] (FLINK-26694) LookupFunction doesn't support multilevel inheritance

2024-01-02 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu reassigned FLINK-26694:
--

Assignee: Xianxun Ye

> LookupFunction doesn't support multilevel inheritance
> -
>
> Key: FLINK-26694
> URL: https://issues.apache.org/jira/browse/FLINK-26694
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.14.4
>Reporter: tsianglei
>Assignee: Xianxun Ye
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
> Attachments: image-2022-03-17-11-19-21-269.png
>
>
> When we implement lookup function with multilevel inheritance,then there has 
> an error。
> {code:java}
> org.apache.flink.table.api.ValidationException: Could not determine a type 
> inference for lookup function 'default_catalog.default_database.dimTable'. 
> Lookup functions support regular type inference. However, for convenience, 
> the output class can simply be a Row or RowData class in which case the input 
> and output types are derived from the table's schema with default conversion. 
>    at 
> org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator$.createLookupTypeInference(LookupJoinCodeGenerator.scala:270)
>     at 
> org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator$.generateLookupFunction(LookupJoinCodeGenerator.scala:166)
>     at 
> org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator$.generateAsyncLookupFunction(LookupJoinCodeGenerator.scala:118)
>     at 
> org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator.generateAsyncLookupFunction(LookupJoinCodeGenerator.scala)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.createAsyncLookupJoin(CommonExecLookupJoin.java:332)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.translateToPlanInternal(CommonExecLookupJoin.java:238)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:88)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:114)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:71)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:70)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:107)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:47)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:705)
>     at 
> org.apache.flink.table.api.internal.StatementSetImpl.explain(StatementSetImpl.java:118)
>     XXX
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>     at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>     at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>     at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.j

[jira] [Updated] (FLINK-26694) LookupFunction doesn't support multilevel inheritance

2024-01-02 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu updated FLINK-26694:
---
Fix Version/s: 1.19.0

> LookupFunction doesn't support multilevel inheritance
> -
>
> Key: FLINK-26694
> URL: https://issues.apache.org/jira/browse/FLINK-26694
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.14.4
>Reporter: tsianglei
>Assignee: Xianxun Ye
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
> Fix For: 1.19.0
>
> Attachments: image-2022-03-17-11-19-21-269.png
>
>
> When we implement lookup function with multilevel inheritance,then there has 
> an error。
> {code:java}
> org.apache.flink.table.api.ValidationException: Could not determine a type 
> inference for lookup function 'default_catalog.default_database.dimTable'. 
> Lookup functions support regular type inference. However, for convenience, 
> the output class can simply be a Row or RowData class in which case the input 
> and output types are derived from the table's schema with default conversion. 
>    at 
> org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator$.createLookupTypeInference(LookupJoinCodeGenerator.scala:270)
>     at 
> org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator$.generateLookupFunction(LookupJoinCodeGenerator.scala:166)
>     at 
> org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator$.generateAsyncLookupFunction(LookupJoinCodeGenerator.scala:118)
>     at 
> org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator.generateAsyncLookupFunction(LookupJoinCodeGenerator.scala)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.createAsyncLookupJoin(CommonExecLookupJoin.java:332)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.translateToPlanInternal(CommonExecLookupJoin.java:238)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:88)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:114)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:71)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:70)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:107)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:47)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:705)
>     at 
> org.apache.flink.table.api.internal.StatementSetImpl.explain(StatementSetImpl.java:118)
>     XXX
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>     at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>     at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>     at 
> org.junit.internal.runners.statements.InvokeMeth

Re: [PR] [FLINK-26694][table] Support lookup join via a multi-level inheritance of TableFunction [flink]

2024-01-02 Thread via GitHub


leonardBang merged PR #23684:
URL: https://github.com/apache/flink/pull/23684


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20899][table-planner-blink] Fix ClassCastException when calculating cost in HepPlanner [flink]

2024-01-02 Thread via GitHub


leonardBang commented on PR #14588:
URL: https://github.com/apache/flink/pull/14588#issuecomment-1874950162

   Hey @godfreyhe would you like to rebase this PR to fix the conflicts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Specifies whether to use HBase table that supports dynamic columns. [flink-connector-hbase]

2024-01-02 Thread via GitHub


MOBIN-F opened a new pull request, #36:
URL: https://github.com/apache/flink-connector-hbase/pull/36

   Specifies whether to use an HBase table that supports dynamic columns.
   
   Refer to the dynamic.table parameter in this document: 
[https://www.alibabacloud.com/help/en/flink/developer-reference/apsaradb-for-hbase-connector#section-ltp-3fy-9qv](https://www.alibabacloud.com/help/en/flink/developer-reference/apsaradb-for-hbase-connector#section-ltp-3fy-9qv)
   
   Sample code for a result table that supports dynamic columns
   ```
   CREATE TEMPORARY TABLE datagen_source (
 id INT,
 f1hour STRING,
 f1deal BIGINT,
 f2day STRING,
 f2deal BIGINT
   ) WITH (
 'connector'='datagen'
   );
   
   CREATE TEMPORARY TABLE hbase_sink (
 rowkey INT,
 f1 ROW<`hour` STRING, deal BIGINT>,
 f2 ROW<`day` STRING, deal BIGINT>
   ) WITH (
 'connector'='hbase-2.2',
 'table-name'='',
 'zookeeper.quorum'='',
 'dynamic.table'='true'
   );
   
   INSERT INTO hbase_sink
   SELECT id, ROW(f1hour, f1deal), ROW(f2day, f2deal) FROM datagen_source;
   ```
   
   If dynamic.table is set to true, HBase table that supports dynamic columns 
is used.
   Two fields must be declared in the rows that correspond to each column 
family. The value of the first field indicates the dynamic column, and the 
value of the second field indicates the value of the dynamic column.
   
   For example, the datagen_source table contains a row of data The row of data 
indicates that the ID of the commodity is 1, the transaction amount of the 
commodity between 10:00 and 11:00 is 100, and the transaction amount of the 
commodity on July 26, 2020 is 1. In this case, a row whose rowkey is 1 is 
inserted into the ApsaraDB for HBase table. f1:10 is 100, and f2:2020-7-26 is 
1.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Specifies whether to use HBase table that supports dynamic columns. [flink-connector-hbase]

2024-01-02 Thread via GitHub


boring-cyborg[bot] commented on PR #36:
URL: 
https://github.com/apache/flink-connector-hbase/pull/36#issuecomment-1874935171

   Thanks for opening this pull request! Please check out our contributing 
guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32958) Support VIEW as a source table in CREATE TABLE ... Like statement

2024-01-02 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17802032#comment-17802032
 ] 

Martijn Visser commented on FLINK-32958:


Thanks [~fornaix] !

> Support VIEW as a source table in CREATE TABLE ... Like statement
> -
>
> Key: FLINK-32958
> URL: https://issues.apache.org/jira/browse/FLINK-32958
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.17.1
>Reporter: Han
>Priority: Major
>  Labels: pull-request-available
>
> We can't create a table from a view through CREATE TABLE LIKE statement
>  
> case 1:
> {code:sql}
> create view source_view as select id,val from source;
> create table sink with ('connector' = 'print') like source_view (excluding 
> all);
> insert into sink select * from source_view;{code}
> case 2
> {code:java}
> DataStreamSource source = ...;
> tEnv.createTemporaryView("source", source);
> tEnv.executeSql("create table sink with ('connector' = 'print') like source 
> (excluding all)");
> tEnv.executeSql("insert into sink select * from source");{code}
>  
> The above cases will throw an exception:
> {code:java}
> Source table '`default_catalog`.`default_database`.`source`' of the LIKE 
> clause can not be a VIEW{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33964) Flink documentation can't be build due to error in Pulsar docs

2024-01-02 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17802031#comment-17802031
 ] 

Martijn Visser commented on FLINK-33964:


Thanks [~leonard] [~Tison]!

> Flink documentation can't be build due to error in Pulsar docs
> --
>
> Key: FLINK-33964
> URL: https://issues.apache.org/jira/browse/FLINK-33964
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar, Documentation
>Affects Versions: pulsar-4.1.0
>Reporter: Martijn Visser
>Assignee: Leonard Xu
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: pulsar-4.1.1
>
>
> https://github.com/apache/flink/actions/runs/7380766702/job/20078487743
> {code:java}
> Start building sites … 
> hugo v0.110.0-e32a493b7826d02763c3b79623952e625402b168+extended linux/amd64 
> BuildDate=2023-01-17T12:16:09Z VendorInfo=gohugoio
> Error: Error building site: 
> "/root/flink/docs/themes/connectors/content.zh/docs/connectors/datastream/pulsar.md:491:1":
>  failed to extract shortcode: template for shortcode 
> "generated/pulsar_admin_configuration" not found
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-33970) Add necessary checks for connector document

2024-01-02 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu reassigned FLINK-33970:
--

Assignee: Zhongqiang Gong

> Add necessary checks for connector document
> ---
>
> Key: FLINK-33970
> URL: https://issues.apache.org/jira/browse/FLINK-33970
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Leonard Xu
>Assignee: Zhongqiang Gong
>Priority: Major
>
> In FLINK-33964, we found the documentation files in independent connector 
> repos lacks basic checks like broken url, this ticket aims to add necessary 
> checks and avoid similar issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33970) Add necessary checks for connector document

2024-01-02 Thread Zhongqiang Gong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17802025#comment-17802025
 ] 

Zhongqiang Gong commented on FLINK-33970:
-

[~leonard]  I am willing to take this issue, Please assign to me. Thank you~

> Add necessary checks for connector document
> ---
>
> Key: FLINK-33970
> URL: https://issues.apache.org/jira/browse/FLINK-33970
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Leonard Xu
>Priority: Major
>
> In FLINK-33964, we found the documentation files in independent connector 
> repos lacks basic checks like broken url, this ticket aims to add necessary 
> checks and avoid similar issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33970) Add necessary checks for connector document

2024-01-02 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu updated FLINK-33970:
---
Description: In FLINK-33964, we found the documentation files in 
independent connector repos lacks basic checks like broken url, this ticket 
aims to add necessary checks and avoid similar issue.

> Add necessary checks for connector document
> ---
>
> Key: FLINK-33970
> URL: https://issues.apache.org/jira/browse/FLINK-33970
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Leonard Xu
>Priority: Major
>
> In FLINK-33964, we found the documentation files in independent connector 
> repos lacks basic checks like broken url, this ticket aims to add necessary 
> checks and avoid similar issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33970) Add necessary checks for connector document

2024-01-02 Thread Leonard Xu (Jira)
Leonard Xu created FLINK-33970:
--

 Summary: Add necessary checks for connector document
 Key: FLINK-33970
 URL: https://issues.apache.org/jira/browse/FLINK-33970
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Leonard Xu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]

2024-01-02 Thread via GitHub


WencongLiu commented on code in PR #23905:
URL: https://github.com/apache/flink/pull/23905#discussion_r1440107366


##
flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java:
##
@@ -39,7 +41,16 @@ public interface SourceReaderContext {
  */
 String getLocalHostName();
 
-/** @return The index of this subtask. */
+/**
+ * Get the index of this subtask.
+ *
+ * @deprecated This method is deprecated since Flink 1.19. All metadata 
about the task should be
+ * provided uniformly by {@link #getTaskInfo()}.
+ * @see https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs";>
+ * FLIP-382: Unify the Provision of Diverse Metadata for Context-like 
APIs 
+ */
+@Deprecated
 int getIndexOfSubtask();

Review Comment:
   For these deprecated methods in the modified interfaces, I've changed them 
from abstract to default and provided default implementations.



##
flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricher.java:
##
@@ -84,14 +88,26 @@ enum FailureType {
  * Get the ID of the job.
  *
  * @return the ID of the job
+ * @deprecated This method is deprecated since Flink 1.19. All 
metadata about the job should
+ * be provided uniformly by {@link #getJobInfo()}.
+ * @see https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs";>
+ * FLIP-382: Unify the Provision of Diverse Metadata for 
Context-like APIs 
  */
+@Deprecated

Review Comment:
   For these deprecated methods in the modified interfaces, I've changed them 
from abstract to default and provided default implementations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]

2024-01-02 Thread via GitHub


WencongLiu commented on code in PR #23905:
URL: https://github.com/apache/flink/pull/23905#discussion_r1440106348


##
flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java:
##
@@ -63,14 +67,27 @@ public interface RuntimeContext {
 /**
  * The ID of the current job. Note that Job ID can change in particular 
upon manual restart. The
  * returned ID should NOT be used for any job management tasks.
+ *
+ * @deprecated This method is deprecated since Flink 1.19. All metadata 
about the job should be
+ * provided uniformly by {@link #getJobInfo()}.
+ * @see https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs";>
+ * FLIP-382: Unify the Provision of Diverse Metadata for Context-like 
APIs 
  */
+@Deprecated
 JobID getJobId();

Review Comment:
   Good point! For these deprecated methods in the modified interfaces, I've 
changed them from abstract to default and provided default implementations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]

2024-01-02 Thread via GitHub


WencongLiu commented on code in PR #23905:
URL: https://github.com/apache/flink/pull/23905#discussion_r1440105816


##
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java:
##
@@ -166,7 +169,10 @@ public class Task
 /** ID which identifies the slot in which the task is supposed to run. */
 private final AllocationID allocationId;
 
-/** TaskInfo object for this task. */
+/** The meta information of current job. */
+private final JobInfo jobInfo;

Review Comment:
   These code changes are prerequisite for the FLIP-382. I've moved these code 
changes to the commit `[FLINK-33905][core] Add getJobInfo() method to 
Environment`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]

2024-01-02 Thread via GitHub


WencongLiu commented on code in PR #23905:
URL: https://github.com/apache/flink/pull/23905#discussion_r1440107273


##
flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java:
##
@@ -119,7 +120,14 @@ public interface Environment {
 Configuration getJobConfiguration();
 
 /**
- * Returns the {@link TaskInfo} object associated with this subtask
+ * Returns the {@link JobInfo} object associated with current job.
+ *
+ * @return JobInfo for current job
+ */
+JobInfo getJobInfo();

Review Comment:
   These code changes are prerequisite for the FLIP-382. I've moved these code 
changes to the commit `[FLINK-33905][core] Add getJobInfo() method to 
Environment`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]

2024-01-02 Thread via GitHub


WencongLiu commented on code in PR #23905:
URL: https://github.com/apache/flink/pull/23905#discussion_r1440108370


##
flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricher.java:
##
@@ -84,14 +88,26 @@ enum FailureType {
  * Get the ID of the job.
  *
  * @return the ID of the job
+ * @deprecated This method is deprecated since Flink 1.19. All 
metadata about the job should
+ * be provided uniformly by {@link #getJobInfo()}.
+ * @see https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs";>
+ * FLIP-382: Unify the Provision of Diverse Metadata for 
Context-like APIs 
  */
+@Deprecated

Review Comment:
   For these deprecated methods in the modified interfaces, I've changed them 
from abstract to default and provide default implementations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]

2024-01-02 Thread via GitHub


WencongLiu commented on code in PR #23905:
URL: https://github.com/apache/flink/pull/23905#discussion_r1440108479


##
flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricher.java:
##
@@ -124,5 +140,16 @@ enum FailureType {
  * @return the Executor pool
  */
 Executor getIOExecutor();
+
+/**
+ * Get the meta information of current job.
+ *
+ * @return the job meta information.
+ */
+@PublicEvolving
+@Nullable
+default JobInfo getJobInfo() {

Review Comment:
   I've changed the default methods including `getJobInfo()` and 
`getTaskInfo()` to be abstract.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]

2024-01-02 Thread via GitHub


WencongLiu commented on code in PR #23905:
URL: https://github.com/apache/flink/pull/23905#discussion_r1440108007


##
flink-core/src/main/java/org/apache/flink/api/connector/sink2/InitContext.java:
##
@@ -35,16 +40,37 @@ public interface InitContext {
  */
 long INITIAL_CHECKPOINT_ID = 1;
 
-/** @return The id of task where the committer is running. */
+/**
+ * Get the id of task where the committer is running.
+ *
+ * @deprecated This method is deprecated since Flink 1.19. All metadata 
about the task should be
+ * provided uniformly by {@link #getTaskInfo()}.
+ * @see https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs";>
+ * FLIP-382: Unify the Provision of Diverse Metadata for Context-like 
APIs 
+ */
 int getSubtaskId();

Review Comment:
   I have annotated these deprecated methods by `@Deprecated` in the 
`InitContext`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]

2024-01-02 Thread via GitHub


WencongLiu commented on code in PR #23905:
URL: https://github.com/apache/flink/pull/23905#discussion_r1440107496


##
flink-core/src/main/java/org/apache/flink/api/connector/sink2/InitContext.java:
##
@@ -57,6 +83,34 @@ public interface InitContext {
 /**
  * The ID of the current job. Note that Job ID can change in particular 
upon manual restart. The
  * returned ID should NOT be used for any job management tasks.
+ *
+ * @deprecated This method is deprecated since Flink 1.19. All metadata 
about the job should be
+ * provided uniformly by {@link #getJobInfo()}.
+ * @see https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs";>
+ * FLIP-382: Unify the Provision of Diverse Metadata for Context-like 
APIs 
  */
 JobID getJobId();
+
+/**
+ * Get the meta information of current job.
+ *
+ * @return the job meta information.
+ */
+@PublicEvolving
+@Nullable
+default JobInfo getJobInfo() {
+return null;
+}
+
+/**
+ * Get the meta information of current task.
+ *
+ * @return the task meta information.
+ */
+@PublicEvolving
+@Nullable
+default TaskInfo getTaskInfo() {
+return null;
+}

Review Comment:
   I've changed the default methods including `getJobInfo()` and 
`getTaskInfo()` to be abstract.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]

2024-01-02 Thread via GitHub


WencongLiu commented on code in PR #23905:
URL: https://github.com/apache/flink/pull/23905#discussion_r1440107425


##
flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java:
##
@@ -68,8 +79,24 @@ public interface SourceReaderContext {
  * Get the current parallelism of this Source.
  *
  * @return the parallelism of the Source.
+ * @deprecated This method is deprecated since Flink 1.19. All metadata 
about the task should be
+ * provided uniformly by {@link #getTaskInfo()}.
+ * @see https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs";>
+ * FLIP-382: Unify the Provision of Diverse Metadata for Context-like 
APIs 
  */
+@Deprecated
 default int currentParallelism() {
 throw new UnsupportedOperationException();
 }
+
+/**
+ * Get the meta information of current task.
+ *
+ * @return the task meta information.
+ */
+@PublicEvolving
+default TaskInfo getTaskInfo() {
+return null;

Review Comment:
   I've changed the default methods including `getJobInfo()` and 
`getTaskInfo()` to be abstract.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]

2024-01-02 Thread via GitHub


WencongLiu commented on code in PR #23905:
URL: https://github.com/apache/flink/pull/23905#discussion_r1440107273


##
flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java:
##
@@ -119,7 +120,14 @@ public interface Environment {
 Configuration getJobConfiguration();
 
 /**
- * Returns the {@link TaskInfo} object associated with this subtask
+ * Returns the {@link JobInfo} object associated with current job.
+ *
+ * @return JobInfo for current job
+ */
+JobInfo getJobInfo();

Review Comment:
   These code changes are prerequisite for the FLIP-380. I've moved these code 
changes to the commit `[FLINK-33905][core] Add getJobInfo() method to 
Environment`.



##
flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java:
##
@@ -39,7 +41,16 @@ public interface SourceReaderContext {
  */
 String getLocalHostName();
 
-/** @return The index of this subtask. */
+/**
+ * Get the index of this subtask.
+ *
+ * @deprecated This method is deprecated since Flink 1.19. All metadata 
about the task should be
+ * provided uniformly by {@link #getTaskInfo()}.
+ * @see https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs";>
+ * FLIP-382: Unify the Provision of Diverse Metadata for Context-like 
APIs 
+ */
+@Deprecated
 int getIndexOfSubtask();

Review Comment:
   For these deprecated methods in the modified interfaces, I've changed them 
from abstract to default and provide default implementations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]

2024-01-02 Thread via GitHub


WencongLiu commented on code in PR #23905:
URL: https://github.com/apache/flink/pull/23905#discussion_r1440107033


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/batch/compact/BatchPartitionCommitterSinkTest.java:
##
@@ -127,7 +127,7 @@ public Long timestamp() {
 };
 
 private static RuntimeContext getMockRuntimeContext() {
-return new MockStreamingRuntimeContext(false, 0, 0) {
+return new MockStreamingRuntimeContext(false, 1, 0) {

Review Comment:
   These tests are conflicted with the newly added correctness check in the 
`TaskInfoImpl`. For example, the subtask ID shouldn't be large than or equal to 
the operator parallelism number. I've fixed these errors in the commit 
`[hotfix] Fix the logical errors for the tests related to task metadata`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]

2024-01-02 Thread via GitHub


WencongLiu commented on code in PR #23905:
URL: https://github.com/apache/flink/pull/23905#discussion_r1440106774


##
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepRuntimeContextTest.java:
##
@@ -110,38 +111,41 @@ public void testCepRuntimeContext() {
 final String taskName = "foobarTask";
 final OperatorMetricGroup metricGroup =
 UnregisteredMetricsGroup.createOperatorMetricGroup();
-final int numberOfParallelSubtasks = 42;
-final int indexOfSubtask = 43;
+final int numberOfParallelSubtasks = 43;
+final int indexOfSubtask = 42;
 final int attemptNumber = 1337;
-final String taskNameWithSubtask = "barfoo";
+final String taskNameWithSubtask = "foobarTask (43/43)#1337";

Review Comment:
   These tests are conflicted with the newly added correctness check in the 
`TaskInfoImpl`. For example, the subtask ID shouldn't be large than or equal to 
the operator parallelism number. I've fixed these errors in the commit 
`[hotfix] Fix the logical errors for the tests related to task metadata`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]

2024-01-02 Thread via GitHub


WencongLiu commented on code in PR #23905:
URL: https://github.com/apache/flink/pull/23905#discussion_r1440106348


##
flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java:
##
@@ -63,14 +67,27 @@ public interface RuntimeContext {
 /**
  * The ID of the current job. Note that Job ID can change in particular 
upon manual restart. The
  * returned ID should NOT be used for any job management tasks.
+ *
+ * @deprecated This method is deprecated since Flink 1.19. All metadata 
about the job should be
+ * provided uniformly by {@link #getJobInfo()}.
+ * @see https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs";>
+ * FLIP-382: Unify the Provision of Diverse Metadata for Context-like 
APIs 
  */
+@Deprecated
 JobID getJobId();

Review Comment:
   Good point! For these deprecated methods in the modified interfaces, I've 
changed them from abstract to default and provide default implementations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]

2024-01-02 Thread via GitHub


WencongLiu commented on code in PR #23905:
URL: https://github.com/apache/flink/pull/23905#discussion_r1440106462


##
flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java:
##
@@ -42,6 +43,7 @@
 import java.util.List;
 import java.util.concurrent.Future;
 
+/** The test for flat map operator. */

Review Comment:
   These code-style clean-ups have been placed into the commit `[hotfix] Fix 
the lacking class comments for the tests using TaskInfo`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]

2024-01-02 Thread via GitHub


WencongLiu commented on code in PR #23905:
URL: https://github.com/apache/flink/pull/23905#discussion_r1440106155


##
flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java:
##
@@ -473,4 +520,26 @@  AggregatingState 
getAggregatingState(
  */
 @PublicEvolving
  MapState getMapState(MapStateDescriptor 
stateProperties);
+
+/**
+ * Get the meta information of current job.
+ *
+ * @return the job meta information.
+ */
+@PublicEvolving
+@Nullable
+default JobInfo getJobInfo() {
+return null;
+}
+
+/**
+ * Get the meta information of current task.
+ *
+ * @return the task meta information.
+ */
+@PublicEvolving
+@Nullable
+default TaskInfo getTaskInfo() {
+return null;
+}

Review Comment:
   I've changed the default methods `getJobInfo()` and `getTaskInfo()` to be 
abstract.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33941][table-planner] use field index instead of field name about window time column [flink]

2024-01-02 Thread via GitHub


LadyForest commented on code in PR #23991:
URL: https://github.com/apache/flink/pull/23991#discussion_r1440106137


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateProjectMergeRule.java:
##
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.planner.plan.rules.logical;
 
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;

Review Comment:
   Nit: update the FLINK MODIFICATION line number mentioned at L#59



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]

2024-01-02 Thread via GitHub


WencongLiu commented on code in PR #23905:
URL: https://github.com/apache/flink/pull/23905#discussion_r1440105816


##
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java:
##
@@ -166,7 +169,10 @@ public class Task
 /** ID which identifies the slot in which the task is supposed to run. */
 private final AllocationID allocationId;
 
-/** TaskInfo object for this task. */
+/** The meta information of current job. */
+private final JobInfo jobInfo;

Review Comment:
   These code changes are prerequisite for the FLIP-380. I've moved these code 
changes to the commit `[FLINK-33905][core] Add getJobInfo() method to 
Environment`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]

2024-01-02 Thread via GitHub


WencongLiu commented on code in PR #23905:
URL: https://github.com/apache/flink/pull/23905#discussion_r1440105629


##
flink-core/src/main/java/org/apache/flink/api/common/JobInfoImpl.java:
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** The default implementation of {@link JobInfo}. */
+@PublicEvolving

Review Comment:
   The `JobInfoImpl` class has been annotated by `@Internal`.



##
flink-core/src/main/java/org/apache/flink/api/common/TaskInfoImpl.java:
##
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The default implementation of {@link TaskInfo}. */
+@PublicEvolving

Review Comment:
   The `TaskInfoImpl` class has been annotated by `@Internal`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33966] Fix the getNumRecordsInPerSecond Utility Function [flink-kubernetes-operator]

2024-01-02 Thread via GitHub


1996fanrui commented on code in PR #743:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/743#discussion_r1440105061


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java:
##
@@ -221,7 +221,7 @@ private static double getNumRecordsInPerSecond(
 numRecordsInPerSecond =
 
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
 }
-if (isSource && (numRecordsInPerSecond == null || 
numRecordsInPerSecond.getSum() == 0)) {
+if (!isSource && (numRecordsInPerSecond == null || 
numRecordsInPerSecond.getSum() == 0)) {
 numRecordsInPerSecond =
 
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC);
 }

Review Comment:
   By the way, current change doesn't make sense.
   
   When a task isn't source, it doesn't have 
`SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC` metric. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33966] Fix the getNumRecordsInPerSecond Utility Function [flink-kubernetes-operator]

2024-01-02 Thread via GitHub


1996fanrui commented on code in PR #743:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/743#discussion_r1440104568


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java:
##
@@ -221,7 +221,7 @@ private static double getNumRecordsInPerSecond(
 numRecordsInPerSecond =
 
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
 }
-if (isSource && (numRecordsInPerSecond == null || 
numRecordsInPerSecond.getSum() == 0)) {
+if (!isSource && (numRecordsInPerSecond == null || 
numRecordsInPerSecond.getSum() == 0)) {
 numRecordsInPerSecond =
 
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC);
 }

Review Comment:
   Thanks @Yang-LI-CS for the PR!
   
   These 2 conditions are indeed the same, it maybe unexpected intuitively. 
Maybe here logic is numRecordsInPerSecond from 3 metrics:
   
   1. FlinkMetric.NUM_RECORDS_IN_PER_SEC
   2. FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC
   3. FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC
   
   When 1 is unavailable, get it from 2. When 2 is unavailable, get it from 3.
   
   I'm not sure about it. Need to confirm with @mxm . 
   
   - If my guess is right, it's not a bug. And I suggest this PR add a little 
comments to clarify it. 
   - If my guess is wrong, it's a bug.  We can fix it.
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-33964) Flink documentation can't be build due to error in Pulsar docs

2024-01-02 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu resolved FLINK-33964.

Resolution: Fixed

> Flink documentation can't be build due to error in Pulsar docs
> --
>
> Key: FLINK-33964
> URL: https://issues.apache.org/jira/browse/FLINK-33964
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar, Documentation
>Affects Versions: pulsar-4.1.0
>Reporter: Martijn Visser
>Assignee: Leonard Xu
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: pulsar-4.1.1
>
>
> https://github.com/apache/flink/actions/runs/7380766702/job/20078487743
> {code:java}
> Start building sites … 
> hugo v0.110.0-e32a493b7826d02763c3b79623952e625402b168+extended linux/amd64 
> BuildDate=2023-01-17T12:16:09Z VendorInfo=gohugoio
> Error: Error building site: 
> "/root/flink/docs/themes/connectors/content.zh/docs/connectors/datastream/pulsar.md:491:1":
>  failed to extract shortcode: template for shortcode 
> "generated/pulsar_admin_configuration" not found
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33964) Flink documentation can't be build due to error in Pulsar docs

2024-01-02 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17802021#comment-17802021
 ] 

Leonard Xu commented on FLINK-33964:


Fixed in flink-connector-pulsar via 

main: 2be651a043b66936ec787033a851ca8c9d7cf95e

v4.1: 05e58b74947dcf46583e7fd1565d459c7b4a44a2

> Flink documentation can't be build due to error in Pulsar docs
> --
>
> Key: FLINK-33964
> URL: https://issues.apache.org/jira/browse/FLINK-33964
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar, Documentation
>Affects Versions: pulsar-4.1.0
>Reporter: Martijn Visser
>Assignee: Leonard Xu
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: pulsar-4.1.1
>
>
> https://github.com/apache/flink/actions/runs/7380766702/job/20078487743
> {code:java}
> Start building sites … 
> hugo v0.110.0-e32a493b7826d02763c3b79623952e625402b168+extended linux/amd64 
> BuildDate=2023-01-17T12:16:09Z VendorInfo=gohugoio
> Error: Error building site: 
> "/root/flink/docs/themes/connectors/content.zh/docs/connectors/datastream/pulsar.md:491:1":
>  failed to extract shortcode: template for shortcode 
> "generated/pulsar_admin_configuration" not found
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33941][table-planner] use field index instead of field name about window time column [flink]

2024-01-02 Thread via GitHub


LadyForest commented on code in PR #23991:
URL: https://github.com/apache/flink/pull/23991#discussion_r1437490488


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala:
##
@@ -1117,6 +1117,7 @@ object AggregateUtil extends Enumeration {
   }
 
   /** Compute field index of given timeField expression. */
+  @Deprecated

Review Comment:
   Can we just remove this method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33964) Flink documentation can't be build due to error in Pulsar docs

2024-01-02 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu updated FLINK-33964:
---
Fix Version/s: pulsar-4.1.1

> Flink documentation can't be build due to error in Pulsar docs
> --
>
> Key: FLINK-33964
> URL: https://issues.apache.org/jira/browse/FLINK-33964
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar, Documentation
>Affects Versions: 1.18.0
>Reporter: Martijn Visser
>Assignee: Leonard Xu
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: pulsar-4.1.1
>
>
> https://github.com/apache/flink/actions/runs/7380766702/job/20078487743
> {code:java}
> Start building sites … 
> hugo v0.110.0-e32a493b7826d02763c3b79623952e625402b168+extended linux/amd64 
> BuildDate=2023-01-17T12:16:09Z VendorInfo=gohugoio
> Error: Error building site: 
> "/root/flink/docs/themes/connectors/content.zh/docs/connectors/datastream/pulsar.md:491:1":
>  failed to extract shortcode: template for shortcode 
> "generated/pulsar_admin_configuration" not found
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33964) Flink documentation can't be build due to error in Pulsar docs

2024-01-02 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu updated FLINK-33964:
---
Affects Version/s: pulsar-4.1.0
   (was: 1.18.0)

> Flink documentation can't be build due to error in Pulsar docs
> --
>
> Key: FLINK-33964
> URL: https://issues.apache.org/jira/browse/FLINK-33964
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar, Documentation
>Affects Versions: pulsar-4.1.0
>Reporter: Martijn Visser
>Assignee: Leonard Xu
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: pulsar-4.1.1
>
>
> https://github.com/apache/flink/actions/runs/7380766702/job/20078487743
> {code:java}
> Start building sites … 
> hugo v0.110.0-e32a493b7826d02763c3b79623952e625402b168+extended linux/amd64 
> BuildDate=2023-01-17T12:16:09Z VendorInfo=gohugoio
> Error: Error building site: 
> "/root/flink/docs/themes/connectors/content.zh/docs/connectors/datastream/pulsar.md:491:1":
>  failed to extract shortcode: template for shortcode 
> "generated/pulsar_admin_configuration" not found
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33964) Flink documentation can't be build due to error in Pulsar docs

2024-01-02 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu updated FLINK-33964:
---
Affects Version/s: 1.18.0

> Flink documentation can't be build due to error in Pulsar docs
> --
>
> Key: FLINK-33964
> URL: https://issues.apache.org/jira/browse/FLINK-33964
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar, Documentation
>Affects Versions: 1.18.0
>Reporter: Martijn Visser
>Assignee: Leonard Xu
>Priority: Blocker
>  Labels: pull-request-available
>
> https://github.com/apache/flink/actions/runs/7380766702/job/20078487743
> {code:java}
> Start building sites … 
> hugo v0.110.0-e32a493b7826d02763c3b79623952e625402b168+extended linux/amd64 
> BuildDate=2023-01-17T12:16:09Z VendorInfo=gohugoio
> Error: Error building site: 
> "/root/flink/docs/themes/connectors/content.zh/docs/connectors/datastream/pulsar.md:491:1":
>  failed to extract shortcode: template for shortcode 
> "generated/pulsar_admin_configuration" not found
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33964) Flink documentation can't be build due to error in Pulsar docs

2024-01-02 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17802019#comment-17802019
 ] 

Leonard Xu commented on FLINK-33964:


Document action has been fixed 
https://github.com/apache/flink/actions/runs/7391764752/job/20114595324

> Flink documentation can't be build due to error in Pulsar docs
> --
>
> Key: FLINK-33964
> URL: https://issues.apache.org/jira/browse/FLINK-33964
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar, Documentation
>Reporter: Martijn Visser
>Assignee: Leonard Xu
>Priority: Blocker
>  Labels: pull-request-available
>
> https://github.com/apache/flink/actions/runs/7380766702/job/20078487743
> {code:java}
> Start building sites … 
> hugo v0.110.0-e32a493b7826d02763c3b79623952e625402b168+extended linux/amd64 
> BuildDate=2023-01-17T12:16:09Z VendorInfo=gohugoio
> Error: Error building site: 
> "/root/flink/docs/themes/connectors/content.zh/docs/connectors/datastream/pulsar.md:491:1":
>  failed to extract shortcode: template for shortcode 
> "generated/pulsar_admin_configuration" not found
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]

2024-01-02 Thread via GitHub


AncyRominus commented on code in PR #733:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/733#discussion_r1440100481


##
flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java:
##
@@ -17,43 +17,88 @@
 
 package org.apache.flink.kubernetes.operator.admission.mutator;
 
+import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
 import org.apache.flink.kubernetes.operator.api.CrdConstants;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.client.informers.cache.Cache;
 import io.javaoperatorsdk.webhook.admission.NotAllowedException;
 import io.javaoperatorsdk.webhook.admission.Operation;
 import io.javaoperatorsdk.webhook.admission.mutation.Mutator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
+import java.util.Optional;
+import java.util.Set;
 
 /** The default mutator. */
 public class FlinkMutator implements Mutator {
 private static final Logger LOG = 
LoggerFactory.getLogger(FlinkMutator.class);
 private static final ObjectMapper mapper = new ObjectMapper();
+private final Set mutators;
+private final InformerManager informerManager;
+
+public FlinkMutator(Set mutators, InformerManager 
informerManager) {
+this.mutators = mutators;
+this.informerManager = informerManager;
+}
 
 @Override
 public HasMetadata mutate(HasMetadata resource, Operation operation)
 throws NotAllowedException {
-if (operation == Operation.CREATE) {
+if (operation == Operation.CREATE || operation == Operation.UPDATE) {
 LOG.debug("Mutating resource {}", resource);
-
 if (CrdConstants.KIND_SESSION_JOB.equals(resource.getKind())) {
-try {
-var sessionJob = mapper.convertValue(resource, 
FlinkSessionJob.class);
-setSessionTargetLabel(sessionJob);
-return sessionJob;
-} catch (Exception e) {
-throw new RuntimeException(e);
-}
+return mutateSessionJob(resource);
+}
+if (CrdConstants.KIND_FLINK_DEPLOYMENT.equals(resource.getKind())) 
{
+return mutateDeployment(resource);
 }
 }
 return resource;
 }
 
+private FlinkSessionJob mutateSessionJob(HasMetadata resource) {
+try {
+var sessionJob = mapper.convertValue(resource, 
FlinkSessionJob.class);
+var namespace = sessionJob.getMetadata().getNamespace();
+var deploymentName = sessionJob.getSpec().getDeploymentName();
+var key = Cache.namespaceKeyFunc(namespace, deploymentName);
+var deployment =
+
informerManager.getFlinkDepInformer(namespace).getStore().getByKey(key);
+
+setSessionTargetLabel(sessionJob);

Review Comment:
   Thanks @gyfora ! 
   Ys, that's a good point. I have added the logic for setting session target 
label as part of default mutator logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]

2024-01-02 Thread via GitHub


AncyRominus commented on code in PR #733:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/733#discussion_r1440100481


##
flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java:
##
@@ -17,43 +17,88 @@
 
 package org.apache.flink.kubernetes.operator.admission.mutator;
 
+import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
 import org.apache.flink.kubernetes.operator.api.CrdConstants;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.client.informers.cache.Cache;
 import io.javaoperatorsdk.webhook.admission.NotAllowedException;
 import io.javaoperatorsdk.webhook.admission.Operation;
 import io.javaoperatorsdk.webhook.admission.mutation.Mutator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
+import java.util.Optional;
+import java.util.Set;
 
 /** The default mutator. */
 public class FlinkMutator implements Mutator {
 private static final Logger LOG = 
LoggerFactory.getLogger(FlinkMutator.class);
 private static final ObjectMapper mapper = new ObjectMapper();
+private final Set mutators;
+private final InformerManager informerManager;
+
+public FlinkMutator(Set mutators, InformerManager 
informerManager) {
+this.mutators = mutators;
+this.informerManager = informerManager;
+}
 
 @Override
 public HasMetadata mutate(HasMetadata resource, Operation operation)
 throws NotAllowedException {
-if (operation == Operation.CREATE) {
+if (operation == Operation.CREATE || operation == Operation.UPDATE) {
 LOG.debug("Mutating resource {}", resource);
-
 if (CrdConstants.KIND_SESSION_JOB.equals(resource.getKind())) {
-try {
-var sessionJob = mapper.convertValue(resource, 
FlinkSessionJob.class);
-setSessionTargetLabel(sessionJob);
-return sessionJob;
-} catch (Exception e) {
-throw new RuntimeException(e);
-}
+return mutateSessionJob(resource);
+}
+if (CrdConstants.KIND_FLINK_DEPLOYMENT.equals(resource.getKind())) 
{
+return mutateDeployment(resource);
 }
 }
 return resource;
 }
 
+private FlinkSessionJob mutateSessionJob(HasMetadata resource) {
+try {
+var sessionJob = mapper.convertValue(resource, 
FlinkSessionJob.class);
+var namespace = sessionJob.getMetadata().getNamespace();
+var deploymentName = sessionJob.getSpec().getDeploymentName();
+var key = Cache.namespaceKeyFunc(namespace, deploymentName);
+var deployment =
+
informerManager.getFlinkDepInformer(namespace).getStore().getByKey(key);
+
+setSessionTargetLabel(sessionJob);

Review Comment:
   Thanks @gyfora ! Added the logic for setting session target label as part of 
default mutator logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33884] Update Pulsar dependency to 3.0.2 in Pulsar Connector [flink-connector-pulsar]

2024-01-02 Thread via GitHub


dchristle commented on PR #72:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/72#issuecomment-1874876697

   > @dchristle I've triggered the CI, but I'm expecting that upgrading the 
version will also have impact on the license checks (the listed versions in the 
NOTICE files)
   
   Got it. Yes, I see the CI check for the NOTICE file failed due to 
out-of-date info. I updated the dependency versions in the META-INF NOTICE 
file. I also updated the year to 2024, in case that could also prevent the CI 
check from passing. We can bump the year in a separate hotfix PR, if you'd 
prefer, as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] Update copyright NOTICE year to 2024 [flink]

2024-01-02 Thread via GitHub


1996fanrui merged PR #24018:
URL: https://github.com/apache/flink/pull/24018


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33969] Implement restore tests for TableSourceScan node [flink]

2024-01-02 Thread via GitHub


flinkbot commented on PR #24020:
URL: https://github.com/apache/flink/pull/24020#issuecomment-1874824806

   
   ## CI report:
   
   * 32e25c7d0a6c58dc23ae28437f7159bf449bdce1 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33969) Implement restore tests for TableSourceScan node

2024-01-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33969:
---
Labels: pull-request-available  (was: )

> Implement restore tests for TableSourceScan node
> 
>
> Key: FLINK-33969
> URL: https://issues.apache.org/jira/browse/FLINK-33969
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Bonnie Varghese
>Assignee: Bonnie Varghese
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33969] Implement restore tests for TableSourceScan node [flink]

2024-01-02 Thread via GitHub


bvarghese1 opened a new pull request, #24020:
URL: https://github.com/apache/flink/pull/24020

   
   
   ## What is the purpose of the change
   
   *Add restore tests for TableSourceScan node*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - Added restore tests for TableSourceScan node which verifies the generated 
compiled plan with the saved compiled plan.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector:  (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-33969) Implement restore tests for TableSourceScan node

2024-01-02 Thread Bonnie Varghese (Jira)
Bonnie Varghese created FLINK-33969:
---

 Summary: Implement restore tests for TableSourceScan node
 Key: FLINK-33969
 URL: https://issues.apache.org/jira/browse/FLINK-33969
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Bonnie Varghese
Assignee: Bonnie Varghese






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33814) Autoscaler Standalone control loop supports multiple thread

2024-01-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33814:
---
Labels: pull-request-available  (was: )

> Autoscaler Standalone control loop supports multiple thread
> ---
>
> Key: FLINK-33814
> URL: https://issues.apache.org/jira/browse/FLINK-33814
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> When the job list has a lot of jobs, single thread isn't enough.
> So Autoscaler Standalone control loop supports multiple thread is very useful 
> for massive production, it's similar to 
> kubernetes.operator.reconcile.parallelism.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33814][autoscaler] Autoscaler Standalone control loop supports multiple threads [flink-kubernetes-operator]

2024-01-02 Thread via GitHub


1996fanrui opened a new pull request, #744:
URL: https://github.com/apache/flink-kubernetes-operator/pull/744

   ## What is the purpose of the change
   
   When the job list has a lot of jobs, single thread isn't enough. So 
Autoscaler Standalone control loop supports multiple thread is very useful for 
massive production, it's similar to `kubernetes.operator.reconcile.parallelism`.
   
   
   ## Brief change log
   
   Introduce `autoscaler.standalone.control-loop.parallelism`.
   
   ## Verifying this change
   
 - Added `StandaloneAutoscalerExecutorTest#testScalingParallelism`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
no
 - Core observer or reconciler logic that is regularly executed: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-33965) Refactor the configuration for autoscaler standalone

2024-01-02 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan resolved FLINK-33965.
-
Fix Version/s: kubernetes-operator-1.8.0
   Resolution: Fixed

> Refactor the configuration for autoscaler standalone
> 
>
> Key: FLINK-33965
> URL: https://issues.apache.org/jira/browse/FLINK-33965
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> Currently, all configurations of autoscaler standalone are maintained in 
> string key.
> When autoscaler standalone has a little options, it's easy to maintain. 
> However, I found it's hard to maintain when we add more options.
> During I developing the JDBC autoscaler state store and control loop supports 
> multiple thread. It will introduce more options.
> h2. Solution:
> Introducing the AutoscalerStandaloneOptions to manage all options of 
> autoscaler standalone. And output the doc for it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-30535] Introduce TTL state based benchmarks [flink-benchmarks]

2024-01-02 Thread via GitHub


Zakelly commented on PR #83:
URL: https://github.com/apache/flink-benchmarks/pull/83#issuecomment-1874802825

   @Myasuka Kindly remind~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33965) Refactor the configuration for autoscaler standalone

2024-01-02 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801998#comment-17801998
 ] 

Rui Fan commented on FLINK-33965:
-

Merged to main(1.8.0) via : fa938439ad8a7601462a3be268077aa4078881bc and 
3cae5907d3a5c610a1aa8e5d41095ddbcba676ef

> Refactor the configuration for autoscaler standalone
> 
>
> Key: FLINK-33965
> URL: https://issues.apache.org/jira/browse/FLINK-33965
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
>
> Currently, all configurations of autoscaler standalone are maintained in 
> string key.
> When autoscaler standalone has a little options, it's easy to maintain. 
> However, I found it's hard to maintain when we add more options.
> During I developing the JDBC autoscaler state store and control loop supports 
> multiple thread. It will introduce more options.
> h2. Solution:
> Introducing the AutoscalerStandaloneOptions to manage all options of 
> autoscaler standalone. And output the doc for it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33965][autoscaler] Introducing the AutoscalerStandaloneOptions to simplify options for autoscaler standalone [flink-kubernetes-operator]

2024-01-02 Thread via GitHub


1996fanrui merged PR #742:
URL: https://github.com/apache/flink-kubernetes-operator/pull/742


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33968][runtime] Advance the calculation of num of subpartitions to the time of initializing execution job vertex [flink]

2024-01-02 Thread via GitHub


flinkbot commented on PR #24019:
URL: https://github.com/apache/flink/pull/24019#issuecomment-1874793432

   
   ## CI report:
   
   * 433585f7f7fb48fe903a8beafce86e0bfec5d776 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33968][runtime] Advance the calculation of num of subpartitions to the time of initializing execution job vertex [flink]

2024-01-02 Thread via GitHub


wanglijie95 commented on PR #24019:
URL: https://github.com/apache/flink/pull/24019#issuecomment-1874792567

   @JunRuiLee @zhuzhurk Could you help to review this PR ? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33968) Compute the number of subpartitions when initializing executon job vertices

2024-01-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33968:
---
Labels: pull-request-available  (was: )

> Compute the number of subpartitions when initializing executon job vertices
> ---
>
> Key: FLINK-33968
> URL: https://issues.apache.org/jira/browse/FLINK-33968
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Lijie Wang
>Assignee: Lijie Wang
>Priority: Major
>  Labels: pull-request-available
>
> Currently, when using dynamic graphs, the subpartition-num of a task is 
> lazily calculated until the task deployment moment, this may lead to some 
> uncertainties in job recovery scenarios:
> Before jm crashs, when deploying upstream tasks, the parallelism of 
> downstream vertex may be unknown, so the subpartiton-num will be the max 
> parallelism of downstream job vertex. However, after jm restarts, when 
> deploying upstream tasks, the parallelism of downstream job vertex may be 
> known(has been calculated before jm crashs and been recovered after jm 
> restarts), so the subpartiton-num will be the actual parallelism of 
> downstream job vertex. The difference of calculated subpartition-num will 
> lead to the partitions generated before jm crashs cannot be reused after jm 
> restarts.
> We will solve this problem by advancing the calculation of subpartitoin-num 
> to the moment of initializing executon job vertex (in ctor of 
> IntermediateResultPartition)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32958) Support VIEW as a source table in CREATE TABLE ... Like statement

2024-01-02 Thread Han (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801993#comment-17801993
 ] 

Han commented on FLINK-32958:
-

[~martijnvisser]  sure.

spark doc: 
[https://spark.apache.org/docs/latest/sql-ref-syntax-ddl-create-table-like.html]

> The {{CREATE TABLE}} statement defines a new table using the 
> definition/metadata of an existing table or {*}view{*}.

 

hive doc: 
[https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-CreateTableLike]

> Before Hive 0.8.0, CREATE TABLE LIKE view_name would make a copy of the view. 
>In Hive 0.8.0 and later releases, CREATE TABLE LIKE view_name creates a table 
>by adopting the schema of view_name (fields and partition columns) using 
>defaults for SerDe and file formats.

 

They all support "CREATE TABLE LIKE VIEW" syntax

> Support VIEW as a source table in CREATE TABLE ... Like statement
> -
>
> Key: FLINK-32958
> URL: https://issues.apache.org/jira/browse/FLINK-32958
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.17.1
>Reporter: Han
>Priority: Major
>  Labels: pull-request-available
>
> We can't create a table from a view through CREATE TABLE LIKE statement
>  
> case 1:
> {code:sql}
> create view source_view as select id,val from source;
> create table sink with ('connector' = 'print') like source_view (excluding 
> all);
> insert into sink select * from source_view;{code}
> case 2
> {code:java}
> DataStreamSource source = ...;
> tEnv.createTemporaryView("source", source);
> tEnv.executeSql("create table sink with ('connector' = 'print') like source 
> (excluding all)");
> tEnv.executeSql("insert into sink select * from source");{code}
>  
> The above cases will throw an exception:
> {code:java}
> Source table '`default_catalog`.`default_database`.`source`' of the LIKE 
> clause can not be a VIEW{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33968][runtime] Advance the calculation of num of subpartitions to the time of initializing execution job vertex [flink]

2024-01-02 Thread via GitHub


wanglijie95 opened a new pull request, #24019:
URL: https://github.com/apache/flink/pull/24019

   ## What is the purpose of the change
   Solve FLINK-33968
   
   ## Verifying this change
   
   This change is already covered by existing tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (**no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**no**)
 - The serializers: (**no**)
 - The runtime per-record code paths (performance sensitive): (**no**)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (**no**)
 - The S3 file system connector: (**no**)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**no**)
 - If yes, how is the feature documented? (**not applicable**)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33940) Update the auto-derivation rule of max parallelism for enlarged upscaling space

2024-01-02 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801992#comment-17801992
 ] 

Rui Fan commented on FLINK-33940:
-

Hi [~Zhanghao Chen] , IIUC flink-benchmarks[1] is checking the performance 
related to state, and we can see the performance change in this web UI[2].

Such as: the valueGet.HEAP[3][4] means valueState.get for hashmap state backend.

!image-2024-01-03-10-52-05-861.png|width=949,height=359!

 

[1] [https://github.com/apache/flink-benchmarks]


[2][http://flink-speed.xyz|http://flink-speed.xyz/]

[3][https://github.com/apache/flink-benchmarks/blob/master/src/main/java/org/apache/flink/state/benchmark/ValueStateBenchmark.java]

[4]http://flink-speed.xyz/timeline/?ben=valueGet.HEAP&env=3

> Update the auto-derivation rule of max parallelism for enlarged upscaling 
> space
> ---
>
> Key: FLINK-33940
> URL: https://issues.apache.org/jira/browse/FLINK-33940
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Reporter: Zhanghao Chen
>Assignee: Zhanghao Chen
>Priority: Major
> Attachments: image-2024-01-03-10-52-05-861.png
>
>
> *Background*
> The choice of the max parallelism of an stateful operator is important as it 
> limits the upper bound of the parallelism of the opeartor while it can also 
> add extra overhead when being set too large. Currently, the max parallelism 
> of an opeartor is either fixed to a value specified by API core / pipeline 
> option or auto-derived with the following rules:
> {{min(max(roundUpToPowerOfTwo(operatorParallelism * 1.5), 128), 32767)}}
> *Problem*
> Recently, the elasticity of Flink jobs is becoming more and more valued by 
> users. The current auto-derived max parallelism was introduced a time time 
> ago and only allows the operator parallelism to be roughly doubled, which is 
> not desired for elasticity. Setting an max parallelism manually may not be 
> desired as well: users may not have the sufficient expertise to select a good 
> max-parallelism value.
> *Proposal*
> Update the auto-derivation rule of max parallelism to derive larger max 
> parallelism for better elasticity experience out of the box. A candidate is 
> as follows:
> {{min(max(roundUpToPowerOfTwo(operatorParallelism * {*}5{*}), {*}1024{*}), 
> 32767)}}
> Looking forward to your opinions on this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33940) Update the auto-derivation rule of max parallelism for enlarged upscaling space

2024-01-02 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-33940:

Attachment: image-2024-01-03-10-52-05-861.png

> Update the auto-derivation rule of max parallelism for enlarged upscaling 
> space
> ---
>
> Key: FLINK-33940
> URL: https://issues.apache.org/jira/browse/FLINK-33940
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Reporter: Zhanghao Chen
>Assignee: Zhanghao Chen
>Priority: Major
> Attachments: image-2024-01-03-10-52-05-861.png
>
>
> *Background*
> The choice of the max parallelism of an stateful operator is important as it 
> limits the upper bound of the parallelism of the opeartor while it can also 
> add extra overhead when being set too large. Currently, the max parallelism 
> of an opeartor is either fixed to a value specified by API core / pipeline 
> option or auto-derived with the following rules:
> {{min(max(roundUpToPowerOfTwo(operatorParallelism * 1.5), 128), 32767)}}
> *Problem*
> Recently, the elasticity of Flink jobs is becoming more and more valued by 
> users. The current auto-derived max parallelism was introduced a time time 
> ago and only allows the operator parallelism to be roughly doubled, which is 
> not desired for elasticity. Setting an max parallelism manually may not be 
> desired as well: users may not have the sufficient expertise to select a good 
> max-parallelism value.
> *Proposal*
> Update the auto-derivation rule of max parallelism to derive larger max 
> parallelism for better elasticity experience out of the box. A candidate is 
> as follows:
> {{min(max(roundUpToPowerOfTwo(operatorParallelism * {*}5{*}), {*}1024{*}), 
> 32767)}}
> Looking forward to your opinions on this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33968) Compute the number of subpartitions when initializing executon job vertices

2024-01-02 Thread Lijie Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lijie Wang updated FLINK-33968:
---
Description: 
Currently, when using dynamic graphs, the subpartition-num of a task is lazily 
calculated until the task deployment moment, this may lead to some 
uncertainties in job recovery scenarios:

Before jm crashs, when deploying upstream tasks, the parallelism of downstream 
vertex may be unknown, so the subpartiton-num will be the max parallelism of 
downstream job vertex. However, after jm restarts, when deploying upstream 
tasks, the parallelism of downstream job vertex may be known(has been 
calculated before jm crashs and been recovered after jm restarts), so the 
subpartiton-num will be the actual parallelism of downstream job vertex. The 
difference of calculated subpartition-num will lead to the partitions generated 
before jm crashs cannot be reused after jm restarts.

We will solve this problem by advancing the calculation of subpartitoin-num to 
the moment of initializing executon job vertex (in ctor of 
IntermediateResultPartition)

  was:
Currently, when using dynamic graphs, the subpartition-num of a task is lazily 
calculated until the task deployment moment, this may lead to some 
uncertainties in job recovery scenarios.

Before jm crashs, when deploying upstream tasks, the parallelism of downstream 
vertex may be unknown, so the subpartiton-num will be the max parallelism of 
downstream job vertex. However, after jm restarts, when deploying upstream 
tasks, the parallelism of downstream job vertex may be known(has been 
calculated before jm crashs and been recovered after jm restarts), so the 
subpartiton-num will be the actual parallelism of downstream job vertex.
 
The difference of calculated subpartition-num will lead to the partitions 
generated before jm crashs cannot be reused after jm restarts.

We will solve this problem by advancing the calculation of subpartitoin-num to 
the moment of initializing executon job vertex (in ctor of 
IntermediateResultPartition)


> Compute the number of subpartitions when initializing executon job vertices
> ---
>
> Key: FLINK-33968
> URL: https://issues.apache.org/jira/browse/FLINK-33968
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Lijie Wang
>Assignee: Lijie Wang
>Priority: Major
>
> Currently, when using dynamic graphs, the subpartition-num of a task is 
> lazily calculated until the task deployment moment, this may lead to some 
> uncertainties in job recovery scenarios:
> Before jm crashs, when deploying upstream tasks, the parallelism of 
> downstream vertex may be unknown, so the subpartiton-num will be the max 
> parallelism of downstream job vertex. However, after jm restarts, when 
> deploying upstream tasks, the parallelism of downstream job vertex may be 
> known(has been calculated before jm crashs and been recovered after jm 
> restarts), so the subpartiton-num will be the actual parallelism of 
> downstream job vertex. The difference of calculated subpartition-num will 
> lead to the partitions generated before jm crashs cannot be reused after jm 
> restarts.
> We will solve this problem by advancing the calculation of subpartitoin-num 
> to the moment of initializing executon job vertex (in ctor of 
> IntermediateResultPartition)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33968) Compute the number of subpartitions when initializing executon job vertices

2024-01-02 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-33968:
--

 Summary: Compute the number of subpartitions when initializing 
executon job vertices
 Key: FLINK-33968
 URL: https://issues.apache.org/jira/browse/FLINK-33968
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Lijie Wang
Assignee: Lijie Wang


Currently, when using dynamic graphs, the subpartition-num of a task is lazily 
calculated until the task deployment moment, this may lead to some 
uncertainties in job recovery scenarios.

Before jm crashs, when deploying upstream tasks, the parallelism of downstream 
vertex may be unknown, so the subpartiton-num will be the max parallelism of 
downstream job vertex. However, after jm restarts, when deploying upstream 
tasks, the parallelism of downstream job vertex may be known(has been 
calculated before jm crashs and been recovered after jm restarts), so the 
subpartiton-num will be the actual parallelism of downstream job vertex.
 
The difference of calculated subpartition-num will lead to the partitions 
generated before jm crashs cannot be reused after jm restarts.

We will solve this problem by advancing the calculation of subpartitoin-num to 
the moment of initializing executon job vertex (in ctor of 
IntermediateResultPartition)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33949) METHOD_ABSTRACT_NOW_DEFAULT should be both source compatible and binary compatible

2024-01-02 Thread Wencong Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wencong Liu closed FLINK-33949.
---
Resolution: Not A Problem

> METHOD_ABSTRACT_NOW_DEFAULT should be both source compatible and binary 
> compatible
> --
>
> Key: FLINK-33949
> URL: https://issues.apache.org/jira/browse/FLINK-33949
> Project: Flink
>  Issue Type: Bug
>  Components: Test Infrastructure
>Affects Versions: 1.19.0
>Reporter: Wencong Liu
>Priority: Major
> Fix For: 1.19.0
>
>
> Currently  I'm trying to refactor some APIs annotated by @Public in 
> [FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs - 
> Apache Flink - Apache Software 
> Foundation|https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs].
>  When an abstract method is changed into a default method, the japicmp maven 
> plugin names this change METHOD_ABSTRACT_NOW_DEFAULT and considers it as 
> source incompatible and binary incompatible.
> The reason maybe that if the abstract method becomes default, the logic in 
> the default method will be ignored by the previous implementations.
> I create a test case in which a job is compiled with newly changed default 
> method and submitted to the previous version. There is no exception thrown. 
> Therefore, the METHOD_ABSTRACT_NOW_DEFAULT shouldn't be incompatible both for 
> source and binary. We could add the following settings to override the 
> default values for binary and source compatibility, such as:
> {code:java}
> 
> 
>METHOD_ABSTRACT_NOW_DEFAULT
>true
>true
> 
>  {code}
> By the way, currently the master branch checks both source compatibility and 
> binary compatibility between minor versions. According to Flink's API 
> compatibility constraints, the master branch shouldn't check binary 
> compatibility. There is already jira FLINK-33009 to track it and we should 
> fix it as soon as possible.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-33892) FLIP-383: Support Job Recovery for Batch Jobs

2024-01-02 Thread Lijie Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lijie Wang reassigned FLINK-33892:
--

Assignee: Junrui Li  (was: Lijie Wang)

> FLIP-383: Support Job Recovery for Batch Jobs
> -
>
> Key: FLINK-33892
> URL: https://issues.apache.org/jira/browse/FLINK-33892
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Reporter: Lijie Wang
>Assignee: Junrui Li
>Priority: Major
>
> This is the umbrella ticket for 
> [FLIP-383|https://cwiki.apache.org/confluence/display/FLINK/FLIP-383%3A+Support+Job+Recovery+for+Batch+Jobs]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33935][checkpoint] Improve the default value logic related to `execution.checkpointing.tolerable-failed-checkpoints` and `state.backend.type` [flink]

2024-01-02 Thread via GitHub


Zakelly commented on code in PR #23987:
URL: https://github.com/apache/flink/pull/23987#discussion_r1440016826


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java:
##
@@ -1040,8 +1041,9 @@ public void configure(ReadableConfig configuration, 
ClassLoader classLoader) {
 configuration
 .getOptional(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG)
 .ifPresent(this::enableChangelogStateBackend);
-Optional.ofNullable(loadStateBackend(configuration, classLoader))
-.ifPresent(this::setStateBackend);
+configuration
+.getOptional(StateBackendOptions.STATE_BACKEND)
+.ifPresent(conf -> 
setStateBackend(loadStateBackend(configuration, classLoader)));

Review Comment:
   @1996fanrui Thanks for your explanation! I missed the point of 
`getOptional`. Thus I have no more question. +1 from my side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33949) METHOD_ABSTRACT_NOW_DEFAULT should be both source compatible and binary compatible

2024-01-02 Thread Wencong Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801988#comment-17801988
 ] 

Wencong Liu commented on FLINK-33949:
-

Thanks for the explanation from [~chesnay] . Given that all the actively 
running code might throw related exceptions, it would be unreasonable to 
directly modify the rules of japicmp. If there's a specific interface that 
needs to break this rule, we should simply exclude that interface. This ticket 
can be closed now.

> METHOD_ABSTRACT_NOW_DEFAULT should be both source compatible and binary 
> compatible
> --
>
> Key: FLINK-33949
> URL: https://issues.apache.org/jira/browse/FLINK-33949
> Project: Flink
>  Issue Type: Bug
>  Components: Test Infrastructure
>Affects Versions: 1.19.0
>Reporter: Wencong Liu
>Priority: Major
> Fix For: 1.19.0
>
>
> Currently  I'm trying to refactor some APIs annotated by @Public in 
> [FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs - 
> Apache Flink - Apache Software 
> Foundation|https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs].
>  When an abstract method is changed into a default method, the japicmp maven 
> plugin names this change METHOD_ABSTRACT_NOW_DEFAULT and considers it as 
> source incompatible and binary incompatible.
> The reason maybe that if the abstract method becomes default, the logic in 
> the default method will be ignored by the previous implementations.
> I create a test case in which a job is compiled with newly changed default 
> method and submitted to the previous version. There is no exception thrown. 
> Therefore, the METHOD_ABSTRACT_NOW_DEFAULT shouldn't be incompatible both for 
> source and binary. We could add the following settings to override the 
> default values for binary and source compatibility, such as:
> {code:java}
> 
> 
>METHOD_ABSTRACT_NOW_DEFAULT
>true
>true
> 
>  {code}
> By the way, currently the master branch checks both source compatibility and 
> binary compatibility between minor versions. According to Flink's API 
> compatibility constraints, the master branch shouldn't check binary 
> compatibility. There is already jira FLINK-33009 to track it and we should 
> fix it as soon as possible.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33940) Update the auto-derivation rule of max parallelism for enlarged upscaling space

2024-01-02 Thread Zhanghao Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801987#comment-17801987
 ] 

Zhanghao Chen commented on FLINK-33940:
---

[~mxm] thanks for the quick response, I'll update with some Nextmark benchmark 
result later on the performance impact of bumping the num of keygroups for an 
informed decision. As for the factor, 10 sounds good for me, maybe let's also 
see what [~fanrui] & [~gyfora] think about it.

> Update the auto-derivation rule of max parallelism for enlarged upscaling 
> space
> ---
>
> Key: FLINK-33940
> URL: https://issues.apache.org/jira/browse/FLINK-33940
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Reporter: Zhanghao Chen
>Assignee: Zhanghao Chen
>Priority: Major
>
> *Background*
> The choice of the max parallelism of an stateful operator is important as it 
> limits the upper bound of the parallelism of the opeartor while it can also 
> add extra overhead when being set too large. Currently, the max parallelism 
> of an opeartor is either fixed to a value specified by API core / pipeline 
> option or auto-derived with the following rules:
> {{min(max(roundUpToPowerOfTwo(operatorParallelism * 1.5), 128), 32767)}}
> *Problem*
> Recently, the elasticity of Flink jobs is becoming more and more valued by 
> users. The current auto-derived max parallelism was introduced a time time 
> ago and only allows the operator parallelism to be roughly doubled, which is 
> not desired for elasticity. Setting an max parallelism manually may not be 
> desired as well: users may not have the sufficient expertise to select a good 
> max-parallelism value.
> *Proposal*
> Update the auto-derivation rule of max parallelism to derive larger max 
> parallelism for better elasticity experience out of the box. A candidate is 
> as follows:
> {{min(max(roundUpToPowerOfTwo(operatorParallelism * {*}5{*}), {*}1024{*}), 
> 32767)}}
> Looking forward to your opinions on this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33792] Generate the same code for the same logic [flink]

2024-01-02 Thread via GitHub


zoudan commented on code in PR #23984:
URL: https://github.com/apache/flink/pull/23984#discussion_r1440008761


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala:
##
@@ -124,14 +124,27 @@ object CodeGenUtils {
 
   private val nameCounter = new AtomicLong
 
-  def newName(name: String): String = {
-s"$name$$${nameCounter.getAndIncrement}"
+  def newName(context: CodeGeneratorContext = null, name: String): String = {

Review Comment:
   Get it, removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-33966) Fix the getNumRecordsInPerSecond Utility Function

2024-01-02 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan reassigned FLINK-33966:
---

Assignee: Yang LI  (was: Li Yang)

> Fix the getNumRecordsInPerSecond Utility Function
> -
>
> Key: FLINK-33966
> URL: https://issues.apache.org/jira/browse/FLINK-33966
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Yang LI
>Assignee: Yang LI
>Priority: Minor
>  Labels: pull-request-available
>
> We have following code in the codebase
> {code:java}
> if (isSource && (numRecordsInPerSecond == null || 
> numRecordsInPerSecond.getSum() == 0)) {
>             numRecordsInPerSecond =
>                     
> flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
>         }{code}
> {code:java}
>         if (isSource && (numRecordsInPerSecond == null || 
> numRecordsInPerSecond.getSum() == 0)) {
>             numRecordsInPerSecond =
>                     
> flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC);
>         }{code}
> with two times the same condition check 
>  
> {*}Definition of done{*}: 
> Update getNumRecordsInPerSecond'{{{}s{}}} second {{if}} condition from {{if 
> (isSource && ...)}} to {{{}if (!isSource && ...){}}}. This addresses the 
> redundant check and ensures correct metric fetching for non-source operators.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-33966) Fix the getNumRecordsInPerSecond Utility Function

2024-01-02 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan reassigned FLINK-33966:
---

Assignee: Li Yang

> Fix the getNumRecordsInPerSecond Utility Function
> -
>
> Key: FLINK-33966
> URL: https://issues.apache.org/jira/browse/FLINK-33966
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Yang LI
>Assignee: Li Yang
>Priority: Minor
>  Labels: pull-request-available
>
> We have following code in the codebase
> {code:java}
> if (isSource && (numRecordsInPerSecond == null || 
> numRecordsInPerSecond.getSum() == 0)) {
>             numRecordsInPerSecond =
>                     
> flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
>         }{code}
> {code:java}
>         if (isSource && (numRecordsInPerSecond == null || 
> numRecordsInPerSecond.getSum() == 0)) {
>             numRecordsInPerSecond =
>                     
> flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC);
>         }{code}
> with two times the same condition check 
>  
> {*}Definition of done{*}: 
> Update getNumRecordsInPerSecond'{{{}s{}}} second {{if}} condition from {{if 
> (isSource && ...)}} to {{{}if (!isSource && ...){}}}. This addresses the 
> redundant check and ensures correct metric fetching for non-source operators.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33792] Generate the same code for the same logic [flink]

2024-01-02 Thread via GitHub


zoudan commented on PR #23984:
URL: https://github.com/apache/flink/pull/23984#issuecomment-1874770437

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33962) Chaining-agnostic OperatorID generation for improved state compatibility on parallelism change

2024-01-02 Thread Zhanghao Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801984#comment-17801984
 ] 

Zhanghao Chen commented on FLINK-33962:
---

Thanks for the quick response, I'll draft a FLIP and raise discussion first.

> Chaining-agnostic OperatorID generation for improved state compatibility on 
> parallelism change
> --
>
> Key: FLINK-33962
> URL: https://issues.apache.org/jira/browse/FLINK-33962
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Reporter: Zhanghao Chen
>Priority: Major
>
> *Background*
> Flink restores opeartor state from snapshots based on matching the 
> operatorIDs. Since Flink 1.2, {{StreamGraphHasherV2}} is used for operatorID 
> generation when no user-set uid exists. The generated OperatorID is 
> deterministic with respect to:
>  * node-local properties (the traverse ID in the BFS for the stream graph)
>  * chained output nodes
>  * input nodes hashes
> *Problem*
> The chaining behavior will affect state compatibility, as the generation of 
> the OperatorID of an Op is dependent on its chained output nodes. For 
> example, a simple source->sink DAG with source and sink chained together is 
> state imcompatible with an otherwise identical DAG with source and sink 
> unchained (either because the parallelisms of the two ops are changed to be 
> unequal or chaining is disabled). This greatly limits the flexibility to 
> perform chain-breaking/joining for performance tuning.
> *Proposal*
> Introduce {{StreamGraphHasherV3}} that is agnostic to the chaining behavior 
> of operators, which effectively just removes L227-235 of 
> [flink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
>  at master · apache/flink 
> (github.com)|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java].
>  
> This will not hurt the deteministicity of the ID generation across job 
> submission as long as the stream graph topology doesn't change, and since new 
> versions of Flink have already adopted pure operator-level state recovery, 
> this will not break state recovery across job submission as long as both 
> submissions use the same hasher.
> This will, however, break cross-version state compatibility. So we can 
> introduce a new option to enable using HasherV3 in v1.19 and consider making 
> it the default hasher in v2.0.
> Looking forward to suggestions on this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [BP][FLINK-33964][pulsar][docs] Drop admin docs reference [flink-connector-pulsar]

2024-01-02 Thread via GitHub


leonardBang merged PR #76:
URL: https://github.com/apache/flink-connector-pulsar/pull/76


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33962) Chaining-agnostic OperatorID generation for improved state compatibility on parallelism change

2024-01-02 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801976#comment-17801976
 ] 

Xintong Song commented on FLINK-33962:
--

Thanks for reaching out, [~Zhanghao Chen].

Just some quick responses, I would need to look a bit more into the related 
components before giving further comments.

Based on your description, in general I think it makes sense to make operator 
id generation independent from chaining. However, as you have already 
mentioned, this is a breaking change that may result in state incompatibility. 
Therefore, I think it deserves a FLIP discussion and an official vote.

> Chaining-agnostic OperatorID generation for improved state compatibility on 
> parallelism change
> --
>
> Key: FLINK-33962
> URL: https://issues.apache.org/jira/browse/FLINK-33962
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Reporter: Zhanghao Chen
>Priority: Major
>
> *Background*
> Flink restores opeartor state from snapshots based on matching the 
> operatorIDs. Since Flink 1.2, {{StreamGraphHasherV2}} is used for operatorID 
> generation when no user-set uid exists. The generated OperatorID is 
> deterministic with respect to:
>  * node-local properties (the traverse ID in the BFS for the stream graph)
>  * chained output nodes
>  * input nodes hashes
> *Problem*
> The chaining behavior will affect state compatibility, as the generation of 
> the OperatorID of an Op is dependent on its chained output nodes. For 
> example, a simple source->sink DAG with source and sink chained together is 
> state imcompatible with an otherwise identical DAG with source and sink 
> unchained (either because the parallelisms of the two ops are changed to be 
> unequal or chaining is disabled). This greatly limits the flexibility to 
> perform chain-breaking/joining for performance tuning.
> *Proposal*
> Introduce {{StreamGraphHasherV3}} that is agnostic to the chaining behavior 
> of operators, which effectively just removes L227-235 of 
> [flink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
>  at master · apache/flink 
> (github.com)|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java].
>  
> This will not hurt the deteministicity of the ID generation across job 
> submission as long as the stream graph topology doesn't change, and since new 
> versions of Flink have already adopted pure operator-level state recovery, 
> this will not break state recovery across job submission as long as both 
> submissions use the same hasher.
> This will, however, break cross-version state compatibility. So we can 
> introduce a new option to enable using HasherV3 in v1.19 and consider making 
> it the default hasher in v2.0.
> Looking forward to suggestions on this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33967) Remove/Rename log4j2-test.properties in flink-streaming-java's test bundle

2024-01-02 Thread Koala Lam (Jira)
Koala Lam created FLINK-33967:
-

 Summary: Remove/Rename log4j2-test.properties in 
flink-streaming-java's test bundle
 Key: FLINK-33967
 URL: https://issues.apache.org/jira/browse/FLINK-33967
 Project: Flink
  Issue Type: Improvement
Reporter: Koala Lam


This file from test classpath is picked automatically by Log4j2. In order to 
reliably use our own log4j2 test config, we have to specify system property 
"log4j2.configurationFile" which is not ideal as we have to manually set it in 
IDE config.

https://logging.apache.org/log4j/2.x/manual/configuration.html



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33966) Fix the getNumRecordsInPerSecond Utility Function

2024-01-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33966:
---
Labels: pull-request-available  (was: )

> Fix the getNumRecordsInPerSecond Utility Function
> -
>
> Key: FLINK-33966
> URL: https://issues.apache.org/jira/browse/FLINK-33966
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Yang LI
>Priority: Minor
>  Labels: pull-request-available
>
> We have following code in the codebase
> {code:java}
> if (isSource && (numRecordsInPerSecond == null || 
> numRecordsInPerSecond.getSum() == 0)) {
>             numRecordsInPerSecond =
>                     
> flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
>         }{code}
> {code:java}
>         if (isSource && (numRecordsInPerSecond == null || 
> numRecordsInPerSecond.getSum() == 0)) {
>             numRecordsInPerSecond =
>                     
> flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC);
>         }{code}
> with two times the same condition check 
>  
> {*}Definition of done{*}: 
> Update getNumRecordsInPerSecond'{{{}s{}}} second {{if}} condition from {{if 
> (isSource && ...)}} to {{{}if (!isSource && ...){}}}. This addresses the 
> redundant check and ensures correct metric fetching for non-source operators.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33966] Fix the getNumRecordsInPerSecond Utility Function [flink-kubernetes-operator]

2024-01-02 Thread via GitHub


Yang-LI-CS opened a new pull request, #743:
URL: https://github.com/apache/flink-kubernetes-operator/pull/743

   
   
   ## What is the purpose of the change
   
   Fix the getNumRecordsInPerSecond Utility Function to addresses the redundant 
check and ensures correct metric fetching for non-source operators.
   
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
no
 - Core observer or reconciler logic that is regularly executed: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33966) Fix the getNumRecordsInPerSecond Utility Function

2024-01-02 Thread Yang LI (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang LI updated FLINK-33966:

Description: 
We have following code in the codebase
{code:java}
if (isSource && (numRecordsInPerSecond == null || 
numRecordsInPerSecond.getSum() == 0)) {
            numRecordsInPerSecond =
                    
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
        }{code}
{code:java}

        if (isSource && (numRecordsInPerSecond == null || 
numRecordsInPerSecond.getSum() == 0)) {
            numRecordsInPerSecond =
                    
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC);
        }{code}

with two times the same condition check 

 

{*}Definition of done{*}: 

Update getNumRecordsInPerSecond'{{{}s{}}} second {{if}} condition from {{if 
(isSource && ...)}} to {{{}if (!isSource && ...){}}}. This addresses the 
redundant check and ensures correct metric fetching for non-source operators.

  was:
We have 2 times the same condition check in the function 
getNumRecordsInPerSecond 
([L220|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java#L220]
 and 
[L224|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java#L224])

 

{*}Definition of done{*}: 

Update getNumRecordsInPerSecond'{{{}s{}}} second {{if}} condition from {{if 
(isSource && ...)}} to {{{}if (!isSource && ...){}}}. This addresses the 
redundant check and ensures correct metric fetching for non-source operators.


> Fix the getNumRecordsInPerSecond Utility Function
> -
>
> Key: FLINK-33966
> URL: https://issues.apache.org/jira/browse/FLINK-33966
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Yang LI
>Priority: Minor
>
> We have following code in the codebase
> {code:java}
> if (isSource && (numRecordsInPerSecond == null || 
> numRecordsInPerSecond.getSum() == 0)) {
>             numRecordsInPerSecond =
>                     
> flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
>         }{code}
> {code:java}
>         if (isSource && (numRecordsInPerSecond == null || 
> numRecordsInPerSecond.getSum() == 0)) {
>             numRecordsInPerSecond =
>                     
> flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC);
>         }{code}
> with two times the same condition check 
>  
> {*}Definition of done{*}: 
> Update getNumRecordsInPerSecond'{{{}s{}}} second {{if}} condition from {{if 
> (isSource && ...)}} to {{{}if (!isSource && ...){}}}. This addresses the 
> redundant check and ensures correct metric fetching for non-source operators.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33966) Fix the getNumRecordsInPerSecond Utility Function

2024-01-02 Thread Yang LI (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang LI updated FLINK-33966:

Description: 
We have 2 times the same condition check in the function 
getNumRecordsInPerSecond 
([L220|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java#L220]
 and 
[L224|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java#L224])

 

{*}Definition of done{*}: 

Update getNumRecordsInPerSecond'{{{}s{}}} second {{if}} condition from {{if 
(isSource && ...)}} to {{{}if (!isSource && ...){}}}. This addresses the 
redundant check and ensures correct metric fetching for non-source operators.

  was:
We have 2 times the same condition check in the function 
getNumRecordsInPerSecond 
([L220|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java#L220]
 and 
[L224|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java#L224])

Definition of done: 
Update getNumRecordsInPerSecond'{{{}s{}}} second {{if}} condition from {{if 
(isSource && ...)}} to {{{}if (!isSource && ...){}}}. This addresses the 
redundant check and ensures correct metric fetching for non-source operators.


> Fix the getNumRecordsInPerSecond Utility Function
> -
>
> Key: FLINK-33966
> URL: https://issues.apache.org/jira/browse/FLINK-33966
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Yang LI
>Priority: Minor
>
> We have 2 times the same condition check in the function 
> getNumRecordsInPerSecond 
> ([L220|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java#L220]
>  and 
> [L224|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java#L224])
>  
> {*}Definition of done{*}: 
> Update getNumRecordsInPerSecond'{{{}s{}}} second {{if}} condition from {{if 
> (isSource && ...)}} to {{{}if (!isSource && ...){}}}. This addresses the 
> redundant check and ensures correct metric fetching for non-source operators.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33966) Fix the getNumRecordsInPerSecond Utility Function

2024-01-02 Thread Yang LI (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang LI updated FLINK-33966:

Description: 
We have 2 times the same condition check in the function 
getNumRecordsInPerSecond 
([L220|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java#L220]
 and 
[L224|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java#L224])

Definition of done: 
Update getNumRecordsInPerSecond'{{{}s{}}} second {{if}} condition from {{if 
(isSource && ...)}} to {{{}if (!isSource && ...){}}}. This addresses the 
redundant check and ensures correct metric fetching for non-source operators.

  was:
We have 2 times the same condition check in the function 
getNumRecordsInPerSecond 
([L220|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java#L220]
 and 
[L224|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java#L224])

Action: 
Update getNumRecordsInPerSecond'{{{}s{}}} second {{if}} condition from {{if 
(isSource && ...)}} to {{{}if (!isSource && ...){}}}. This addresses the 
redundant check and ensures correct metric fetching for non-source operators.


> Fix the getNumRecordsInPerSecond Utility Function
> -
>
> Key: FLINK-33966
> URL: https://issues.apache.org/jira/browse/FLINK-33966
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Yang LI
>Priority: Minor
>
> We have 2 times the same condition check in the function 
> getNumRecordsInPerSecond 
> ([L220|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java#L220]
>  and 
> [L224|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java#L224])
> Definition of done: 
> Update getNumRecordsInPerSecond'{{{}s{}}} second {{if}} condition from {{if 
> (isSource && ...)}} to {{{}if (!isSource && ...){}}}. This addresses the 
> redundant check and ensures correct metric fetching for non-source operators.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33966) Fix the getNumRecordsInPerSecond Utility Function

2024-01-02 Thread Yang LI (Jira)
Yang LI created FLINK-33966:
---

 Summary: Fix the getNumRecordsInPerSecond Utility Function
 Key: FLINK-33966
 URL: https://issues.apache.org/jira/browse/FLINK-33966
 Project: Flink
  Issue Type: Bug
  Components: Autoscaler
Affects Versions: kubernetes-operator-1.7.0
Reporter: Yang LI


We have 2 times the same condition check in the function 
getNumRecordsInPerSecond 
([L220|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java#L220]
 and 
[L224|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java#L224])

Action: 
Update getNumRecordsInPerSecond'{{{}s{}}} second {{if}} condition from {{if 
(isSource && ...)}} to {{{}if (!isSource && ...){}}}. This addresses the 
redundant check and ensures correct metric fetching for non-source operators.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-02 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1438684576


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java:
##
@@ -96,28 +97,115 @@ public JdbcDynamicTableSource(
 public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
context) {
 // JDBC only support non-nested look up keys
 String[] keyNames = new String[context.getKeys().length];
+
 for (int i = 0; i < keyNames.length; i++) {
 int[] innerKeyArr = context.getKeys()[i];
 Preconditions.checkArgument(
 innerKeyArr.length == 1, "JDBC only support non-nested 
look up keys");
 keyNames[i] = 
DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
 }
+
 final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
+
+String[] conditions = null;
+
+if (this.resolvedPredicates != null) {
+conditions = new String[this.resolvedPredicates.size()];
+int processedPushdownParamsIndex = 0;
+for (int i = 0; i < this.resolvedPredicates.size(); i++) {
+String resolvedPredicate = this.resolvedPredicates.get(i);
+
+/*
+ * This replace seems like it should be using a Flink class to 
resolve the parameter. It does not
+ * effect the dialects as the placeholder comes from 
JdbcFilterPushdownPreparedStatementVisitor.
+ *
+ * Here is what has been considered as alternatives.
+ *
+ * We cannot use the way this is done in 
getScanRuntimeProvider, as the index we have is the index
+ * into the filters, but it needs the index into the fields. 
For example one lookup key and one filter
+ * would both have an index of 0, which the subsequent code 
would incorrectly resolve to the first
+ * field.
+ * We cannot use the PreparedStatement as we have not got 
access to the statement here.
+ * We cannot use ParameterizedPredicate as it takes the filter 
expression as input (e.g EQUALS(...)
+ * not the form we have here an example would be ('field1'= ?).
+ *
+ * An entry in the resolvedPredicates list may have more than 
one associated pushdown parameter, for example
+ * a query like this : ... on e.type = 2 and (e.age = 50 OR 
height > 90)  and a.ip = e.ip;
+ * will have 2 resolvedPredicates and 3 pushdownParams. The 
2nd and 3rd pushdownParams will be for the second
+ * resolvedPredicate.
+ *
+ */
+ArrayList paramsForThisPredicate = new ArrayList();
+char placeholderChar =
+
JdbcFilterPushdownPreparedStatementVisitor.PUSHDOWN_PREDICATE_PLACEHOLDER
+.charAt(0);
+
+int count =
+(int) resolvedPredicate.chars().filter(ch -> ch == 
placeholderChar).count();
+
+for (int j = processedPushdownParamsIndex;
+j < processedPushdownParamsIndex + count;
+j++) {
+
paramsForThisPredicate.add(this.pushdownParams[j].toString());
+}
+processedPushdownParamsIndex = processedPushdownParamsIndex + 
count;

Review Comment:
   @snuyanzin Yes I had not thought of that. I have reviewed this. I see 3 
options I am looking to explore:

   1. amend the string processing to cope with the scenario you have 
highlighted. This is tricky as I will need to parse the string looking for 
simple binary operators and OR operators that link many simple expressions, 
when there might be quote in the column names. This seems overly complicated 
and fragile.
   2.  Investigate whether I can change how `this.pushdownParams` and 
`this.resolvedPredicates` are populated or add more instance variables - so 
they can be simply processed without complex string logic. 
   3. remove the pushdown capability for lookup joins from this connector to 
enable the release. I think this is looking more attractive as the pragmatic 
approach, given what we are seeing.
   
   I hope 2. will be fruitful.  
   
  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33940) Update the auto-derivation rule of max parallelism for enlarged upscaling space

2024-01-02 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801843#comment-17801843
 ] 

Maximilian Michels commented on FLINK-33940:


[~Zhanghao Chen] Even though the factor only affects high parallelism operators 
> 840, I wonder whether we need to leave more room for scaleup. But I don't 
have a strong opinion.

{quote}
IIUC, when the parallelism of one job is very small(it's 1 or 2) and the max 
parallelism is 1024, one subtask will have 1024 keyGroups. From state backend 
side, too many key groups may effect the performance. (This is my concern to 
change it by default in Flink Community.)
{quote}

[~fanrui] I think we need to find out how big the performance impact actually 
is when jumping from 128 to 840 key groups. But 128 may just have been a very 
conservative number.

> Update the auto-derivation rule of max parallelism for enlarged upscaling 
> space
> ---
>
> Key: FLINK-33940
> URL: https://issues.apache.org/jira/browse/FLINK-33940
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Reporter: Zhanghao Chen
>Assignee: Zhanghao Chen
>Priority: Major
>
> *Background*
> The choice of the max parallelism of an stateful operator is important as it 
> limits the upper bound of the parallelism of the opeartor while it can also 
> add extra overhead when being set too large. Currently, the max parallelism 
> of an opeartor is either fixed to a value specified by API core / pipeline 
> option or auto-derived with the following rules:
> {{min(max(roundUpToPowerOfTwo(operatorParallelism * 1.5), 128), 32767)}}
> *Problem*
> Recently, the elasticity of Flink jobs is becoming more and more valued by 
> users. The current auto-derived max parallelism was introduced a time time 
> ago and only allows the operator parallelism to be roughly doubled, which is 
> not desired for elasticity. Setting an max parallelism manually may not be 
> desired as well: users may not have the sufficient expertise to select a good 
> max-parallelism value.
> *Proposal*
> Update the auto-derivation rule of max parallelism to derive larger max 
> parallelism for better elasticity experience out of the box. A candidate is 
> as follows:
> {{min(max(roundUpToPowerOfTwo(operatorParallelism * {*}5{*}), {*}1024{*}), 
> 32767)}}
> Looking forward to your opinions on this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-20454][format][debezium] Allow to read metadata for debezium-avro-confluent format [flink]

2024-01-02 Thread via GitHub


creativedutchmen commented on PR #18744:
URL: https://github.com/apache/flink/pull/18744#issuecomment-1874226714

   This would be super helpful to have merged. Are there any plans on pushing 
this forward?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-02 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1438684576


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java:
##
@@ -96,28 +97,115 @@ public JdbcDynamicTableSource(
 public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
context) {
 // JDBC only support non-nested look up keys
 String[] keyNames = new String[context.getKeys().length];
+
 for (int i = 0; i < keyNames.length; i++) {
 int[] innerKeyArr = context.getKeys()[i];
 Preconditions.checkArgument(
 innerKeyArr.length == 1, "JDBC only support non-nested 
look up keys");
 keyNames[i] = 
DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
 }
+
 final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
+
+String[] conditions = null;
+
+if (this.resolvedPredicates != null) {
+conditions = new String[this.resolvedPredicates.size()];
+int processedPushdownParamsIndex = 0;
+for (int i = 0; i < this.resolvedPredicates.size(); i++) {
+String resolvedPredicate = this.resolvedPredicates.get(i);
+
+/*
+ * This replace seems like it should be using a Flink class to 
resolve the parameter. It does not
+ * effect the dialects as the placeholder comes from 
JdbcFilterPushdownPreparedStatementVisitor.
+ *
+ * Here is what has been considered as alternatives.
+ *
+ * We cannot use the way this is done in 
getScanRuntimeProvider, as the index we have is the index
+ * into the filters, but it needs the index into the fields. 
For example one lookup key and one filter
+ * would both have an index of 0, which the subsequent code 
would incorrectly resolve to the first
+ * field.
+ * We cannot use the PreparedStatement as we have not got 
access to the statement here.
+ * We cannot use ParameterizedPredicate as it takes the filter 
expression as input (e.g EQUALS(...)
+ * not the form we have here an example would be ('field1'= ?).
+ *
+ * An entry in the resolvedPredicates list may have more than 
one associated pushdown parameter, for example
+ * a query like this : ... on e.type = 2 and (e.age = 50 OR 
height > 90)  and a.ip = e.ip;
+ * will have 2 resolvedPredicates and 3 pushdownParams. The 
2nd and 3rd pushdownParams will be for the second
+ * resolvedPredicate.
+ *
+ */
+ArrayList paramsForThisPredicate = new ArrayList();
+char placeholderChar =
+
JdbcFilterPushdownPreparedStatementVisitor.PUSHDOWN_PREDICATE_PLACEHOLDER
+.charAt(0);
+
+int count =
+(int) resolvedPredicate.chars().filter(ch -> ch == 
placeholderChar).count();
+
+for (int j = processedPushdownParamsIndex;
+j < processedPushdownParamsIndex + count;
+j++) {
+
paramsForThisPredicate.add(this.pushdownParams[j].toString());
+}
+processedPushdownParamsIndex = processedPushdownParamsIndex + 
count;

Review Comment:
   @snuyanzin Yes I had not thought of that. I have reviewed this. I see 2 
options I am looking to explore:

   1. amend the string processing to cope with the scenario you have 
highlighted. This is tricky as I will need to parse the string looking for 
simple binary operators and OR operators that link many simple expressions, 
when there might be quote in the column names. This seems overly complicated 
and fragile.
   2.  Investigate whether I can change how `this.pushdownParams` and 
`this.resolvedPredicates` are populated or add more instance variables - so 
they can be simply processed without complex string logic. 
   3. remove the pushdown capability for lookup joins from this connector to 
enable the release. I think this is looking more attractive as the pragmatic 
approach, given what we are seeing.
   
   I hope 2. will be fruitful.  
   
  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-02 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1438684576


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java:
##
@@ -96,28 +97,115 @@ public JdbcDynamicTableSource(
 public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
context) {
 // JDBC only support non-nested look up keys
 String[] keyNames = new String[context.getKeys().length];
+
 for (int i = 0; i < keyNames.length; i++) {
 int[] innerKeyArr = context.getKeys()[i];
 Preconditions.checkArgument(
 innerKeyArr.length == 1, "JDBC only support non-nested 
look up keys");
 keyNames[i] = 
DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
 }
+
 final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
+
+String[] conditions = null;
+
+if (this.resolvedPredicates != null) {
+conditions = new String[this.resolvedPredicates.size()];
+int processedPushdownParamsIndex = 0;
+for (int i = 0; i < this.resolvedPredicates.size(); i++) {
+String resolvedPredicate = this.resolvedPredicates.get(i);
+
+/*
+ * This replace seems like it should be using a Flink class to 
resolve the parameter. It does not
+ * effect the dialects as the placeholder comes from 
JdbcFilterPushdownPreparedStatementVisitor.
+ *
+ * Here is what has been considered as alternatives.
+ *
+ * We cannot use the way this is done in 
getScanRuntimeProvider, as the index we have is the index
+ * into the filters, but it needs the index into the fields. 
For example one lookup key and one filter
+ * would both have an index of 0, which the subsequent code 
would incorrectly resolve to the first
+ * field.
+ * We cannot use the PreparedStatement as we have not got 
access to the statement here.
+ * We cannot use ParameterizedPredicate as it takes the filter 
expression as input (e.g EQUALS(...)
+ * not the form we have here an example would be ('field1'= ?).
+ *
+ * An entry in the resolvedPredicates list may have more than 
one associated pushdown parameter, for example
+ * a query like this : ... on e.type = 2 and (e.age = 50 OR 
height > 90)  and a.ip = e.ip;
+ * will have 2 resolvedPredicates and 3 pushdownParams. The 
2nd and 3rd pushdownParams will be for the second
+ * resolvedPredicate.
+ *
+ */
+ArrayList paramsForThisPredicate = new ArrayList();
+char placeholderChar =
+
JdbcFilterPushdownPreparedStatementVisitor.PUSHDOWN_PREDICATE_PLACEHOLDER
+.charAt(0);
+
+int count =
+(int) resolvedPredicate.chars().filter(ch -> ch == 
placeholderChar).count();
+
+for (int j = processedPushdownParamsIndex;
+j < processedPushdownParamsIndex + count;
+j++) {
+
paramsForThisPredicate.add(this.pushdownParams[j].toString());
+}
+processedPushdownParamsIndex = processedPushdownParamsIndex + 
count;

Review Comment:
   @snuyanzin Yes I had not thought of that. I have reviewed this. I see 2 
options I am looking to explore:

   1. amend the string processing to cope with the scenario you have 
highlighted. This is tricky as I will need to parse the string looking for 
simple binary operators and OR operators that link many simple expressions, 
when there might be quote in the column names. This seems overly complicated 
and fragile.
   2.  Investigate whether I can change how `this.pushdownParams` and 
`this.resolvedPredicates` are populated or add more instance variables - so 
they can be simply processed without complex string logic. 
   3. remove the pushdown capability from this connector to enable the release. 
I think this is looking more attractive as the pragmatic approach, given what 
we are seeing.
   
   I hope 2. will be fruitful.  
   
  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33694][gs-fs-hadoop] Support overriding GCS root URL [flink]

2024-01-02 Thread via GitHub


patricklucas commented on PR #23836:
URL: https://github.com/apache/flink/pull/23836#issuecomment-1874157629

   I do not think there is much room for unexpected behavior here. The surface 
area of the change is quite small and follows the existing model for 
translating a Hadoop config option specific to this filesystem implementation 
into its native SDK equivalent.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33940) Update the auto-derivation rule of max parallelism for enlarged upscaling space

2024-01-02 Thread Zhanghao Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801815#comment-17801815
 ] 

Zhanghao Chen commented on FLINK-33940:
---

[~mxm] There's no concrete rationale for selecting the factor 5, it's just a 
magic number that most users are satified with in our production env. I'd be 
good with 10 as well. Do you have any perference on the selection of the factor?

> Update the auto-derivation rule of max parallelism for enlarged upscaling 
> space
> ---
>
> Key: FLINK-33940
> URL: https://issues.apache.org/jira/browse/FLINK-33940
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Reporter: Zhanghao Chen
>Assignee: Zhanghao Chen
>Priority: Major
>
> *Background*
> The choice of the max parallelism of an stateful operator is important as it 
> limits the upper bound of the parallelism of the opeartor while it can also 
> add extra overhead when being set too large. Currently, the max parallelism 
> of an opeartor is either fixed to a value specified by API core / pipeline 
> option or auto-derived with the following rules:
> {{min(max(roundUpToPowerOfTwo(operatorParallelism * 1.5), 128), 32767)}}
> *Problem*
> Recently, the elasticity of Flink jobs is becoming more and more valued by 
> users. The current auto-derived max parallelism was introduced a time time 
> ago and only allows the operator parallelism to be roughly doubled, which is 
> not desired for elasticity. Setting an max parallelism manually may not be 
> desired as well: users may not have the sufficient expertise to select a good 
> max-parallelism value.
> *Proposal*
> Update the auto-derivation rule of max parallelism to derive larger max 
> parallelism for better elasticity experience out of the box. A candidate is 
> as follows:
> {{min(max(roundUpToPowerOfTwo(operatorParallelism * {*}5{*}), {*}1024{*}), 
> 32767)}}
> Looking forward to your opinions on this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix] Update copyright NOTICE year to 2024 [flink]

2024-01-02 Thread via GitHub


flinkbot commented on PR #24018:
URL: https://github.com/apache/flink/pull/24018#issuecomment-1874115436

   
   ## CI report:
   
   * 42c6079b521c50cc7e0e8017c29ce70f634688d3 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] Update copyright NOTICE year to 2024 [flink]

2024-01-02 Thread via GitHub


caicancai commented on PR #24018:
URL: https://github.com/apache/flink/pull/24018#issuecomment-1874107814

   cc @1996fanrui 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [hotfix] Update copyright NOTICE year to 2024 [flink]

2024-01-02 Thread via GitHub


caicancai opened a new pull request, #24018:
URL: https://github.com/apache/flink/pull/24018

   ## What is the purpose of the change
   
   Update copyright NOTICE year to 2024
   
   ## Brief change log
   Update copyright NOTICE year to 2024
   
   ## Verifying this change
   Update copyright NOTICE year to 2024
   
   ## Does this pull request potentially affect one of the following parts:
   
   no
   
   ## Documentation
   Does this pull request introduce a new feature? (no)
   If yes, how is the feature documented? (not documented)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-33433) Support invoke async-profiler on Jobmanager through REST API

2024-01-02 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-33433.
--
Fix Version/s: 1.19.0
   Resolution: Fixed

merged in master: 
240494fd6169cb98b47808a003ee00804a780360...3efe9d2b09bedde89322594f0f3927004b6b1adf

> Support invoke async-profiler on Jobmanager through REST API
> 
>
> Key: FLINK-33433
> URL: https://issues.apache.org/jira/browse/FLINK-33433
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / REST
>Affects Versions: 1.19.0
>Reporter: Yu Chen
>Assignee: Yu Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33940) Update the auto-derivation rule of max parallelism for enlarged upscaling space

2024-01-02 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801790#comment-17801790
 ] 

Maximilian Michels commented on FLINK-33940:


[~Zhanghao Chen] Thank you for the proposal. I agree with using highly 
composite numbers, as this will provide more flexibility to the autoscaler. I'm 
not sure about the {{operatorParallelism * 5}}. What is the rational for 
selecting this factor? Why not {{*10}}? 

> Update the auto-derivation rule of max parallelism for enlarged upscaling 
> space
> ---
>
> Key: FLINK-33940
> URL: https://issues.apache.org/jira/browse/FLINK-33940
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Reporter: Zhanghao Chen
>Assignee: Zhanghao Chen
>Priority: Major
>
> *Background*
> The choice of the max parallelism of an stateful operator is important as it 
> limits the upper bound of the parallelism of the opeartor while it can also 
> add extra overhead when being set too large. Currently, the max parallelism 
> of an opeartor is either fixed to a value specified by API core / pipeline 
> option or auto-derived with the following rules:
> {{min(max(roundUpToPowerOfTwo(operatorParallelism * 1.5), 128), 32767)}}
> *Problem*
> Recently, the elasticity of Flink jobs is becoming more and more valued by 
> users. The current auto-derived max parallelism was introduced a time time 
> ago and only allows the operator parallelism to be roughly doubled, which is 
> not desired for elasticity. Setting an max parallelism manually may not be 
> desired as well: users may not have the sufficient expertise to select a good 
> max-parallelism value.
> *Proposal*
> Update the auto-derivation rule of max parallelism to derive larger max 
> parallelism for better elasticity experience out of the box. A candidate is 
> as follows:
> {{min(max(roundUpToPowerOfTwo(operatorParallelism * {*}5{*}), {*}1024{*}), 
> 32767)}}
> Looking forward to your opinions on this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33433][rest] Introduce async-profiler to support profiling Job… [flink]

2024-01-02 Thread via GitHub


Myasuka closed pull request #23820: [FLINK-33433][rest] Introduce 
async-profiler to support profiling Job…
URL: https://github.com/apache/flink/pull/23820


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-02 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1438684576


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java:
##
@@ -96,28 +97,115 @@ public JdbcDynamicTableSource(
 public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
context) {
 // JDBC only support non-nested look up keys
 String[] keyNames = new String[context.getKeys().length];
+
 for (int i = 0; i < keyNames.length; i++) {
 int[] innerKeyArr = context.getKeys()[i];
 Preconditions.checkArgument(
 innerKeyArr.length == 1, "JDBC only support non-nested 
look up keys");
 keyNames[i] = 
DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
 }
+
 final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
+
+String[] conditions = null;
+
+if (this.resolvedPredicates != null) {
+conditions = new String[this.resolvedPredicates.size()];
+int processedPushdownParamsIndex = 0;
+for (int i = 0; i < this.resolvedPredicates.size(); i++) {
+String resolvedPredicate = this.resolvedPredicates.get(i);
+
+/*
+ * This replace seems like it should be using a Flink class to 
resolve the parameter. It does not
+ * effect the dialects as the placeholder comes from 
JdbcFilterPushdownPreparedStatementVisitor.
+ *
+ * Here is what has been considered as alternatives.
+ *
+ * We cannot use the way this is done in 
getScanRuntimeProvider, as the index we have is the index
+ * into the filters, but it needs the index into the fields. 
For example one lookup key and one filter
+ * would both have an index of 0, which the subsequent code 
would incorrectly resolve to the first
+ * field.
+ * We cannot use the PreparedStatement as we have not got 
access to the statement here.
+ * We cannot use ParameterizedPredicate as it takes the filter 
expression as input (e.g EQUALS(...)
+ * not the form we have here an example would be ('field1'= ?).
+ *
+ * An entry in the resolvedPredicates list may have more than 
one associated pushdown parameter, for example
+ * a query like this : ... on e.type = 2 and (e.age = 50 OR 
height > 90)  and a.ip = e.ip;
+ * will have 2 resolvedPredicates and 3 pushdownParams. The 
2nd and 3rd pushdownParams will be for the second
+ * resolvedPredicate.
+ *
+ */
+ArrayList paramsForThisPredicate = new ArrayList();
+char placeholderChar =
+
JdbcFilterPushdownPreparedStatementVisitor.PUSHDOWN_PREDICATE_PLACEHOLDER
+.charAt(0);
+
+int count =
+(int) resolvedPredicate.chars().filter(ch -> ch == 
placeholderChar).count();
+
+for (int j = processedPushdownParamsIndex;
+j < processedPushdownParamsIndex + count;
+j++) {
+
paramsForThisPredicate.add(this.pushdownParams[j].toString());
+}
+processedPushdownParamsIndex = processedPushdownParamsIndex + 
count;

Review Comment:
   @snuyanzin Yes I had not thought of that. I have reviewed this. I see 2 
options I am looking to explore:

   1. amend the string processing to cope with the scenario you have 
highlighted. This is tricky as I will need to parse the string looking for 
simple binary operators and OR operators that link many simple expressions, 
when there might be quote in the column names. This seems overly complicated 
and fragile.
   2.  Investigate whether I can change how `this.pushdownParams` and 
`this.resolvedPredicates` are populated or add more instance variables - so 
they can be simply processed without complex string logic. 
   
   I hope 2. will be fruitful.  
   
  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33611) Support Large Protobuf Schemas

2024-01-02 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801783#comment-17801783
 ] 

Benchao Li commented on FLINK-33611:


{quote}the main proposal here is to now reuse the variable names across 
different split method scopes. This will greatly reduce the constant pool size.
{quote}
[~dsaisharath] I'm wondering where the "constants" come from, IIUC, local 
variables in method should not affect class's constant pool?

> Support Large Protobuf Schemas
> --
>
> Key: FLINK-33611
> URL: https://issues.apache.org/jira/browse/FLINK-33611
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.18.0
>Reporter: Sai Sharath Dandi
>Assignee: Sai Sharath Dandi
>Priority: Major
>  Labels: pull-request-available
>
> h3. Background
> Flink serializes and deserializes protobuf format data by calling the decode 
> or encode method in GeneratedProtoToRow_XXX.java generated by codegen to 
> parse byte[] data into Protobuf Java objects. FLINK-32650 has introduced the 
> ability to split the generated code to improve the performance for large 
> Protobuf schemas. However, this is still not sufficient to support some 
> larger protobuf schemas as the generated code exceeds the java constant pool 
> size [limit|https://en.wikipedia.org/wiki/Java_class_file#The_constant_pool] 
> and we can see errors like "Too many constants" when trying to compile the 
> generated code. 
> *Solution*
> Since we already have the split code functionality already introduced, the 
> main proposal here is to now reuse the variable names across different split 
> method scopes. This will greatly reduce the constant pool size. One more 
> optimization is to only split the last code segment also only when the size 
> exceeds split threshold limit. Currently, the last segment of the generated 
> code is always being split which can lead to too many split methods and thus 
> exceed the constant pool size limit



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >