[jira] [Closed] (FLINK-34481) Migrate SetOpRewriteUtil

2024-02-29 Thread Jacky Lau (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacky Lau closed FLINK-34481.
-
Release Note: merged f523b9d6191ecb584e36aa2aeffcd0659ce231f7
  Resolution: Fixed

> Migrate SetOpRewriteUtil
> 
>
> Key: FLINK-34481
> URL: https://issues.apache.org/jira/browse/FLINK-34481
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0
>Reporter: Jacky Lau
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> we should Migrate SetOpRewriteUtil for 
> ReplaceMinusWithAntiJoinRule
> ReplaceMinusWithAntiJoinRule
> RewriteIntersectAllRule
> RewriteMinusAllRule



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34481) Migrate SetOpRewriteUtil

2024-02-29 Thread Jacky Lau (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822434#comment-17822434
 ] 

Jacky Lau commented on FLINK-34481:
---

Merged f523b9d6191ecb584e36aa2aeffcd0659ce231f7

> Migrate SetOpRewriteUtil
> 
>
> Key: FLINK-34481
> URL: https://issues.apache.org/jira/browse/FLINK-34481
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0
>Reporter: Jacky Lau
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> we should Migrate SetOpRewriteUtil for 
> ReplaceMinusWithAntiJoinRule
> ReplaceMinusWithAntiJoinRule
> RewriteIntersectAllRule
> RewriteMinusAllRule



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix][runtime] Remove unneeded requestTaskManagerFileUploadByName [flink]

2024-02-29 Thread via GitHub


1996fanrui merged PR #24386:
URL: https://github.com/apache/flink/pull/24386


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][runtime] Remove unneeded requestTaskManagerFileUploadByName [flink]

2024-02-29 Thread via GitHub


1996fanrui commented on PR #24386:
URL: https://github.com/apache/flink/pull/24386#issuecomment-1972683847

   Thanks @yuchen-ecnu for the review, merging~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join

2024-02-29 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822433#comment-17822433
 ] 

Shuai Xu commented on FLINK-34380:
--

Let me take a look.

> Strange RowKind and records about intermediate output when using minibatch 
> join
> ---
>
> Key: FLINK-34380
> URL: https://issues.apache.org/jira/browse/FLINK-34380
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.19.0
>
>
> {code:java}
> // Add it in CalcItCase
> @Test
>   def test(): Unit = {
> env.setParallelism(1)
> val rows = Seq(
>   changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
>   changelogRow("-D", java.lang.Integer.valueOf(1), "99")
> )
> val dataId = TestValuesTableFactory.registerData(rows)
> val ddl =
>   s"""
>  |CREATE TABLE t1 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl)
> val ddl2 =
>   s"""
>  |CREATE TABLE t2 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl2)
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
> println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = 
> t2.a").explain())
> tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
>   } {code}
> Output:
> {code:java}
> ++-+-+-+-+
> | op |           a |               b |          a0 |      b0 |
> ++-+-+-+-+
> | +U |           1 |               1 |           1 |      99 |
> | +U |           1 |              99 |           1 |      99 |
> | -U |           1 |               1 |           1 |      99 |
> | -D |           1 |              99 |           1 |      99 |
> ++-+-+-+-+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34481][table] Migrate SetOpRewriteUtil to java [flink]

2024-02-29 Thread via GitHub


JingGe merged PR #24358:
URL: https://github.com/apache/flink/pull/24358


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34522][core] Changing the Time to Duration for StateTtlConfig.Builder.cleanupInRocksdbCompactFilter [flink]

2024-02-29 Thread via GitHub


1996fanrui commented on PR #24387:
URL: https://github.com/apache/flink/pull/24387#issuecomment-1972677393

   Many thanks @HuangXingBo for the debug.
   
   I'm on vacation and don't take my Mac, so my colleague @RocMarshal helps fix 
this for 1.19 first due to 1.19 will be released soon.
   
   This PR is related to master(1.20), let us do it after that, thanks~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-xxxxx][api] Introduce DataStream, Partitioning and ProcessFunction [flink]

2024-02-29 Thread via GitHub


flinkbot commented on PR #24422:
URL: https://github.com/apache/flink/pull/24422#issuecomment-1972653582

   
   ## CI report:
   
   * cb4615792281b375a45da41795e9f1a0cd70ae2d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34487) Integrate tools/azure-pipelines/build-python-wheels.yml into GHA nightly workflow

2024-02-29 Thread Muhammet Orazov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822420#comment-17822420
 ] 

Muhammet Orazov commented on FLINK-34487:
-

Hey [~mapohl], I'd like to work on this. Could you please assign this to me? 
Thanks!

> Integrate tools/azure-pipelines/build-python-wheels.yml into GHA nightly 
> workflow
> -
>
> Key: FLINK-34487
> URL: https://issues.apache.org/jira/browse/FLINK-34487
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System / CI
>Affects Versions: 1.19.0, 1.18.1, 1.20.0
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: github-actions
>
> Analogously to the [Azure Pipelines nightly 
> config|https://github.com/apache/flink/blob/e923d4060b6dabe650a8950774d176d3e92437c2/tools/azure-pipelines/build-apache-repo.yml#L183]
>  we want to generate the wheels artifacts in the GHA nightly workflow as well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-xxxxx][api] Introduce DataStream, Partitioning and ProcessFunction [flink]

2024-02-29 Thread via GitHub


reswqa opened a new pull request, #24422:
URL: https://github.com/apache/flink/pull/24422

   ## What is the purpose of the change
   
   *This is the first PR for DataStream V2, and aim to  implement FLIP-409: 
DataStream V2 Building Blocks: DataStream, Partitioning and ProcessFunction *
   
   
   ## Brief change log
   
 - *Introduce four type of `streams` in FLIP-409.*
 - *Introduce `ProcessFunction` and all it's variants. *
 - *Introduce new `ExecutionEnvironment` to submit job written in the new 
API.*
 - *Supports FLIP-27 based `Source` and `Sink-v2`*
   
   
   ## Verifying this change
   
   This change added new tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - Documentation needs to be added in subsequent PR.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34535] Support JobPlanInfo for the explain result [flink]

2024-02-29 Thread via GitHub


huyuanfeng2018 commented on PR #24411:
URL: https://github.com/apache/flink/pull/24411#issuecomment-1972635698

   > Hi @huyuanfeng2018 thanks for the PR. I have a few comments:
   > 
   > * Please add `JSON_JOB_PLAN `  to the `python` API (e.g.,  to the 
`basic_operations.py`) and test it there as well (e.g., in 
`test_table_environment.py`)
   > * Why the `JSON_JOB_PLAN` keyword didn't end up in our parser 
(`Parser.tdd`, `Parser.jj`)?
   > * The main motivation behind this PR "' combine it with the parameter 
`pipeline.jobvertex-parallelism-overrides` to set up my task parallelism'" is 
not clear to me and is missing in this PR tests
   > * Add the necessary documentation to the `explain.md`
   > * Please add test with `Hive` as well (e.g., see 
`HiveTableSinkITCase::testHiveTableSinkWithParallelismBase`
   > * Add `JSON_JOB_PLAN ` to the `FlinkSqlParserImplConstants`
   > * Update `TableTestBase::doVerifyExplain` to support `JSON_JOB_PLAN` as 
well
   > * Tests with table API are missing
   
   Thank you very much for your review. There is still a lot of work to be done 
on this PR. I will mark it as a draft first.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34180) Accept Flink CDC project as part of Apache Flink

2024-02-29 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu updated FLINK-34180:
---
Description: 
As discussed in  Flink dev  mailing list[1][2], we have accepted the Flink CDC 
project contribution, we should finish the repo and doc migration as soon as 
possible.

[1] https://lists.apache.org/thread/sq5w21tcomrmb025tl820cxty9l0z26w
[2] https://lists.apache.org/thread/cw29fhsp99243yfo95xrkw82s5s418ob

  was:
As discussed in  Flink dev  mailing list[1][2], we have accepted the Flink CDC 
project contribution, we should finished the repo and doc migration as soon as 
possible.

[1] https://lists.apache.org/thread/sq5w21tcomrmb025tl820cxty9l0z26w
[2] https://lists.apache.org/thread/cw29fhsp99243yfo95xrkw82s5s418ob


> Accept Flink CDC project as part of Apache Flink
> 
>
> Key: FLINK-34180
> URL: https://issues.apache.org/jira/browse/FLINK-34180
> Project: Flink
>  Issue Type: New Feature
>  Components: Flink CDC
>Reporter: Leonard Xu
>Assignee: Leonard Xu
>Priority: Major
>
> As discussed in  Flink dev  mailing list[1][2], we have accepted the Flink 
> CDC project contribution, we should finish the repo and doc migration as soon 
> as possible.
> [1] https://lists.apache.org/thread/sq5w21tcomrmb025tl820cxty9l0z26w
> [2] https://lists.apache.org/thread/cw29fhsp99243yfo95xrkw82s5s418ob



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34529) Projection cannot be pushed down through rank operator.

2024-02-29 Thread yisha zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822415#comment-17822415
 ] 

yisha zhou commented on FLINK-34529:


hi [~lincoln.86xy] , thanks for your advice.

After discussion with [~libenchao] , I agreed that putting these kind of rules 
to cost-based planner seems to be in line with future trend.

Meanwhile I found that most of ProjectXXTransposeRules are in 
`FlinkStreamRuleSets#PROJECT_RULES`and `PROJECT_RULES` seems to be used both in 
'LOGICAL' (volcano)and 'PROJECT_REWRITE'(hep).  I prepare to add 
`CoreRules.PROJECT_WINDOW_TRANSPOSE` to `FlinkStreamRuleSets#PROJECT_RULES`too, 
so that both kind of planner can utilize the rule.  WDT? 

> Projection cannot be pushed down through rank operator.
> ---
>
> Key: FLINK-34529
> URL: https://issues.apache.org/jira/browse/FLINK-34529
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: yisha zhou
>Assignee: yisha zhou
>Priority: Major
>
> When there is a rank/deduplicate operator, the projection based on output of 
> this operator cannot be pushed down to the input of it.
> The following code can help reproducing the issue:
> {code:java}
> val util = streamTestUtil() 
> util.addTableSource[(String, Int, String)]("T1", 'a, 'b, 'c)
> util.addTableSource[(String, Int, String)]("T2", 'd, 'e, 'f)
> val sql =
>   """
> |SELECT a FROM (
> |  SELECT a, f,
> |  ROW_NUMBER() OVER (PARTITION BY f ORDER BY c DESC) as rank_num
> |  FROM  T1, T2
> |  WHERE T1.a = T2.d
> |)
> |WHERE rank_num = 1
>   """.stripMargin
> util.verifyPlan(sql){code}
> The plan is expected to be:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], 
> select=[a, c, f])
>+- Exchange(distribution=[hash[f]])
>   +- Calc(select=[a, c, f])
>  +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, c, d, f], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[a]])
> :  +- Calc(select=[a, c])
> : +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +- Exchange(distribution=[hash[d]])
>+- Calc(select=[d, f])
>   +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) 
> {code}
> Notice that the 'select' of Join operator is [a, c, d, f]. However the actual 
> plan is:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], 
> select=[a, c, f])
>+- Exchange(distribution=[hash[f]])
>   +- Calc(select=[a, c, f])
>  +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, 
> e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[a]])
> :  +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +- Exchange(distribution=[hash[d]])
>+- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
>  {code}
> the 'select' of Join operator is [a, b, c, d, e, f], which means the 
> projection in the final Calc is not passed through the Rank.
> And I think an easy way to fix this issue is to add 
> org.apache.calcite.rel.rules.ProjectWindowTransposeRule into 
> FlinkStreamRuleSets.LOGICAL_OPT_RULES.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34548) FLIP-409: DataStream V2 Building Blocks: DataStream, Partitioning and ProcessFunction

2024-02-29 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-34548:
---
Labels:   (was: Umbrella)

> FLIP-409: DataStream V2 Building Blocks: DataStream, Partitioning and 
> ProcessFunction
> -
>
> Key: FLINK-34548
> URL: https://issues.apache.org/jira/browse/FLINK-34548
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
>
> This is the umbrella ticket for FLIP-409: DataStream V2 Building Blocks: 
> DataStream, Partitioning and ProcessFunction.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34548) FLIP-409: DataStream V2 Building Blocks: DataStream, Partitioning and ProcessFunction

2024-02-29 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-34548:
---
Description: This is the ticket for FLIP-409: DataStream V2 Building 
Blocks: DataStream, Partitioning and ProcessFunction.  (was: This is the 
umbrella ticket for FLIP-409: DataStream V2 Building Blocks: DataStream, 
Partitioning and ProcessFunction.)

> FLIP-409: DataStream V2 Building Blocks: DataStream, Partitioning and 
> ProcessFunction
> -
>
> Key: FLINK-34548
> URL: https://issues.apache.org/jira/browse/FLINK-34548
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
>
> This is the ticket for FLIP-409: DataStream V2 Building Blocks: DataStream, 
> Partitioning and ProcessFunction.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join

2024-02-29 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822401#comment-17822401
 ] 

xuyang commented on FLINK-34380:


Hi,  [~xu_shuai_] . Can you help check it again?

> Strange RowKind and records about intermediate output when using minibatch 
> join
> ---
>
> Key: FLINK-34380
> URL: https://issues.apache.org/jira/browse/FLINK-34380
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.19.0
>
>
> {code:java}
> // Add it in CalcItCase
> @Test
>   def test(): Unit = {
> env.setParallelism(1)
> val rows = Seq(
>   changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
>   changelogRow("-D", java.lang.Integer.valueOf(1), "99")
> )
> val dataId = TestValuesTableFactory.registerData(rows)
> val ddl =
>   s"""
>  |CREATE TABLE t1 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl)
> val ddl2 =
>   s"""
>  |CREATE TABLE t2 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl2)
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
> println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = 
> t2.a").explain())
> tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
>   } {code}
> Output:
> {code:java}
> ++-+-+-+-+
> | op |           a |               b |          a0 |      b0 |
> ++-+-+-+-+
> | +U |           1 |               1 |           1 |      99 |
> | +U |           1 |              99 |           1 |      99 |
> | -U |           1 |               1 |           1 |      99 |
> | -D |           1 |              99 |           1 |      99 |
> ++-+-+-+-+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34535) Support JobPlanInfo for the explain result

2024-02-29 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822399#comment-17822399
 ] 

yuanfenghu commented on FLINK-34535:


[~lincoln.86xy] 
Thank you for your comment,

> Regarding to the motivation 'combine it with the parameter 
> `pipeline.jobvertex-parallelism-overrides` to set up my task parallelism', 
> could you explain more about it?

`pipeline.jobvertex-parallelism-overrides` Parameters can modify the 
parallelism of each flink task before flink runs the task. He needs to specify 
the parallelism value of vertex in jobgraph, like this:
{code:java}
//代码占位符
pipeline.jobvertex-parallelism-overrides = 
0a448493b4782967b150582570326227:4,bc764cd8ddf7a0cff126f51c16239658:3 {code}
This way when flink runs, the parallelism of the corresponding vertexid: 
0a448493b4782967b150582570326227 and bc764cd8ddf7a0cff126f51c16239658 will be 
set to 4,3.

So my motivation is that I want to set the parallelism of each of my tasks in 
the task generated by flinksql, but in flinksql parallelism is set globally, so 
I need to get each of my jobVertexId before the task is run. But the existing 
explain does not return this information, so I want to return this information 
in the explain

> (Also adding new mode to current `ExplainDetail` is a public api change, 
>there should be a 
>[FLIP|https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals?src=contextnavpagetreemode]
> discussion)

 
I noticed ExplainDetail have a @PublicEvolving, But in fact, my function is 
relatively simple, and it should be to build a separate FLIP? Maybe if we could 
put this information in What about JSON_EXECUTION_PLAN? 

 
 

>  COMPILE PLAN

COMPILE PLAN It seems to be a reasonable way also,Can you help me @ some 
friends who are more familiar with this area (flinksql)? 
Discuss their views on this issue
 
 
 

 

 

> Support JobPlanInfo for the explain result
> --
>
> Key: FLINK-34535
> URL: https://issues.apache.org/jira/browse/FLINK-34535
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: yuanfenghu
>Priority: Major
>  Labels: pull-request-available
>
> In the Flink Sql Explain syntax, we can set ExplainDetails to plan 
> JSON_EXECUTION_PLAN, but we cannot plan JobPlanInfo. If we can explain this 
> part of the information, referring to JobPlanInfo, I can combine it with the 
> parameter `pipeline.jobvertex-parallelism-overrides` to set up my task 
> parallelism



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-02-29 Thread via GitHub


LadyForest commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1508526295


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/utils/PathUtils.java:
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.utils;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/** Path utils for file system. */
+public class PathUtils {

Review Comment:
   I originally planned to move the logic for generating the staging directory 
into `FileSystemOutputFormat`. However, when modifying the `HiveTableSink`, I 
found that `createBatchCompactSink` relies on tmpPath.
   
   I didn't want to introduce changes to other modules, so I ultimately opted 
for a compromise and extracted this utility method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-34535) Support JobPlanInfo for the explain result

2024-02-29 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822390#comment-17822390
 ] 

lincoln lee edited comment on FLINK-34535 at 3/1/24 5:43 AM:
-

[~heigebupahei] The current `explain` syntax is just for showing the planinfo, 
if we want to override something to the real plan which is for execution, I 
suggest to extend current compiled plan, as we have such ways to compile a 
query and execute the compiled plan:
{code:java}
COMPILE PLAN FOR '' 
EXECUTE PLAN FOR ''{code}
(Also adding new mode to current `ExplainDetail` is a public api change, there 
should be a 
[FLIP|https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals?src=contextnavpagetreemode]
 discussion)

Regarding to the motivation 'combine it with the parameter 
`pipeline.jobvertex-parallelism-overrides` to set up my task parallelism', 
could you explain more about it?


was (Author: lincoln.86xy):
[~heigebupahei] The current `explain` syntax is just for showing the planinfo, 
if we want to override something to the real plan which is for execution, I 
suggest to extend current compiled plan, as we have such ways to compile a 
query and execute the compiled plan:
{{{}{}}}{{{}{}}}
{code:java}
COMPILE PLAN FOR '' 
EXECUTE PLAN FOR ''{code}
(Also adding new mode to current `ExplainDetail` is a public api change, there 
should be a 
[FLIP|https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals?src=contextnavpagetreemode]
 discussion)

Regarding to the motivation 'combine it with the parameter 
`pipeline.jobvertex-parallelism-overrides` to set up my task parallelism', 
could you explain more about it?
{{}}

> Support JobPlanInfo for the explain result
> --
>
> Key: FLINK-34535
> URL: https://issues.apache.org/jira/browse/FLINK-34535
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: yuanfenghu
>Priority: Major
>  Labels: pull-request-available
>
> In the Flink Sql Explain syntax, we can set ExplainDetails to plan 
> JSON_EXECUTION_PLAN, but we cannot plan JobPlanInfo. If we can explain this 
> part of the information, referring to JobPlanInfo, I can combine it with the 
> parameter `pipeline.jobvertex-parallelism-overrides` to set up my task 
> parallelism



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34535) Support JobPlanInfo for the explain result

2024-02-29 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822390#comment-17822390
 ] 

lincoln lee commented on FLINK-34535:
-

[~heigebupahei] The current `explain` syntax is just for showing the planinfo, 
if we want to override something to the real plan which is for execution, I 
suggest to extend current compiled plan, as we have such ways to compile a 
query and execute the compiled plan:
{{{}{}}}{{{}{}}}
{code:java}
COMPILE PLAN FOR '' 
EXECUTE PLAN FOR ''{code}
(Also adding new mode to current `ExplainDetail` is a public api change, there 
should be a 
[FLIP|https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals?src=contextnavpagetreemode]
 discussion)

Regarding to the motivation 'combine it with the parameter 
`pipeline.jobvertex-parallelism-overrides` to set up my task parallelism', 
could you explain more about it?
{{}}

> Support JobPlanInfo for the explain result
> --
>
> Key: FLINK-34535
> URL: https://issues.apache.org/jira/browse/FLINK-34535
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: yuanfenghu
>Priority: Major
>  Labels: pull-request-available
>
> In the Flink Sql Explain syntax, we can set ExplainDetails to plan 
> JSON_EXECUTION_PLAN, but we cannot plan JobPlanInfo. If we can explain this 
> part of the information, referring to JobPlanInfo, I can combine it with the 
> parameter `pipeline.jobvertex-parallelism-overrides` to set up my task 
> parallelism



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34556][table] Migrate EnumerableToLogicalTableScan to java. [flink]

2024-02-29 Thread via GitHub


flinkbot commented on PR #24421:
URL: https://github.com/apache/flink/pull/24421#issuecomment-1972492174

   
   ## CI report:
   
   * c060447bb29118c3127c63eb09290a8c02c43e85 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34556) Migrate EnumerableToLogicalTableScan

2024-02-29 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34556:
---
Labels: pull-request-available  (was: )

> Migrate EnumerableToLogicalTableScan
> 
>
> Key: FLINK-34556
> URL: https://issues.apache.org/jira/browse/FLINK-34556
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0
>Reporter: Jacky Lau
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-34556][table] Migrate EnumerableToLogicalTableScan to java. [flink]

2024-02-29 Thread via GitHub


liuyongvs opened a new pull request, #24421:
URL: https://github.com/apache/flink/pull/24421

   ## What is the purpose of the change
   
   The PR migrates EnumerableToLogicalTableScan to java
   it doesn't touch EnumerableToLogicalTableScanTest to be sure that java 
version continues passing it
   
   ## Verifying this change
   
   This change is already covered by existing tests
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-34556) Migrate EnumerableToLogicalTableScan

2024-02-29 Thread Jacky Lau (Jira)
Jacky Lau created FLINK-34556:
-

 Summary: Migrate EnumerableToLogicalTableScan
 Key: FLINK-34556
 URL: https://issues.apache.org/jira/browse/FLINK-34556
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Affects Versions: 1.20.0
Reporter: Jacky Lau
 Fix For: 1.20.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34483][docs] Improve the documentation of 'state.checkpoints.dir' and 'state.checkpoint-storage' [flink]

2024-02-29 Thread via GitHub


fredia commented on code in PR #24401:
URL: https://github.com/apache/flink/pull/24401#discussion_r1508457644


##
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java:
##
@@ -76,6 +76,39 @@ public class CheckpointingOptions {
  * CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader)} 
method is called.
  *
  * Recognized shortcut names are 'jobmanager' and 'filesystem'.
+ *
+ * {@link #CHECKPOINT_STORAGE} and {@link #CHECKPOINTS_DIRECTORY} are 
usually combined to
+ * configure the checkpoint location. The behaviors of different 
combinations are as follows:

Review Comment:
   Thanks for the suggestion, I reorganized the description as you suggested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34522][core] Changing the Time to Duration for StateTtlConfig.Builder.cleanupInRocksdbCompactFilter [flink]

2024-02-29 Thread via GitHub


HuangXingBo commented on code in PR #24417:
URL: https://github.com/apache/flink/pull/24417#discussion_r1508454782


##
flink-python/pyflink/datastream/state.py:
##
@@ -809,7 +809,7 @@ def cleanup_incrementally(self,
 def cleanup_in_rocksdb_compact_filter(
 self,
 query_time_after_num_entries,
-periodic_compaction_time=Time.days(30)) -> \
+periodic_compaction_time=Duration.of_days(30)) -> \

Review Comment:
   `Duration.of_days(30)` -> None



##
flink-python/pyflink/datastream/state.py:
##
@@ -925,14 +926,14 @@ class 
RocksdbCompactFilterCleanupStrategy(CleanupStrategy):
 
 def __init__(self,
  query_time_after_num_entries: int,
- periodic_compaction_time=Time.days(30)):
+ periodic_compaction_time=Duration.of_days(30)):

Review Comment:
   `Duration.of_days(30)-> None` and do the similar thing to 
`cleanup_in_rocksdb_compact_filter`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34529) Projection cannot be pushed down through rank operator.

2024-02-29 Thread yisha zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822363#comment-17822363
 ] 

yisha zhou commented on FLINK-34529:


[~libenchao]  Thanks for assigning the ticket to me. 

To the point 'I would prefer to putting these transposing rules all in 
"LOGICAL" stage', I agree with that. I'll add `ProjectWindowTransposeRule` in 
LOGICAL stage. With regard to other already-existing transpose rules in other 
stages, moving them into LOGICAL in this MR or in a independent MR, which do 
you think is better?

For the question 'I'm even wondering that if we really needs 
{{{}CalcRankTransposeRule{}}}', I've tried to remove it, and found that 
`ProjectWindowTransposeRule` can completely cover the functionality of 
`{{{}CalcRankTransposeRule{}}}`(from the results of tests introduced along with 
this rule) and even do  much better job in some cases. Therefore, I prepare to 
remove this rule in the this PR, WDT? 

 

 

> Projection cannot be pushed down through rank operator.
> ---
>
> Key: FLINK-34529
> URL: https://issues.apache.org/jira/browse/FLINK-34529
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: yisha zhou
>Assignee: yisha zhou
>Priority: Major
>
> When there is a rank/deduplicate operator, the projection based on output of 
> this operator cannot be pushed down to the input of it.
> The following code can help reproducing the issue:
> {code:java}
> val util = streamTestUtil() 
> util.addTableSource[(String, Int, String)]("T1", 'a, 'b, 'c)
> util.addTableSource[(String, Int, String)]("T2", 'd, 'e, 'f)
> val sql =
>   """
> |SELECT a FROM (
> |  SELECT a, f,
> |  ROW_NUMBER() OVER (PARTITION BY f ORDER BY c DESC) as rank_num
> |  FROM  T1, T2
> |  WHERE T1.a = T2.d
> |)
> |WHERE rank_num = 1
>   """.stripMargin
> util.verifyPlan(sql){code}
> The plan is expected to be:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], 
> select=[a, c, f])
>+- Exchange(distribution=[hash[f]])
>   +- Calc(select=[a, c, f])
>  +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, c, d, f], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[a]])
> :  +- Calc(select=[a, c])
> : +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +- Exchange(distribution=[hash[d]])
>+- Calc(select=[d, f])
>   +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) 
> {code}
> Notice that the 'select' of Join operator is [a, c, d, f]. However the actual 
> plan is:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], 
> select=[a, c, f])
>+- Exchange(distribution=[hash[f]])
>   +- Calc(select=[a, c, f])
>  +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, 
> e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[a]])
> :  +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +- Exchange(distribution=[hash[d]])
>+- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
>  {code}
> the 'select' of Join operator is [a, b, c, d, e, f], which means the 
> projection in the final Calc is not passed through the Rank.
> And I think an easy way to fix this issue is to add 
> org.apache.calcite.rel.rules.ProjectWindowTransposeRule into 
> FlinkStreamRuleSets.LOGICAL_OPT_RULES.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34529) Projection cannot be pushed down through rank operator.

2024-02-29 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822362#comment-17822362
 ] 

lincoln lee commented on FLINK-34529:
-

[~nilerzhou] Thanks for reporting this!
As you folks discussed above, there indeed need a projection pushdown, reuse 
the calcite's corerules always be the first choice for flink(except for the 
special things in streaming for now).
And for this case itself, can we just add `CoreRules.PROJECT_WINDOW_TRANSPOSE` 
into `FlinkStreamRuleSets#DEFAULT_REWRITE_RULES` (which will take effect during 
the `DEFAULT_REWRITE` rbo phase)?As in the current code, we tend to treat 
projection/predicate pushdown as a deterministic optimization.  WDYT?

> Projection cannot be pushed down through rank operator.
> ---
>
> Key: FLINK-34529
> URL: https://issues.apache.org/jira/browse/FLINK-34529
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: yisha zhou
>Assignee: yisha zhou
>Priority: Major
>
> When there is a rank/deduplicate operator, the projection based on output of 
> this operator cannot be pushed down to the input of it.
> The following code can help reproducing the issue:
> {code:java}
> val util = streamTestUtil() 
> util.addTableSource[(String, Int, String)]("T1", 'a, 'b, 'c)
> util.addTableSource[(String, Int, String)]("T2", 'd, 'e, 'f)
> val sql =
>   """
> |SELECT a FROM (
> |  SELECT a, f,
> |  ROW_NUMBER() OVER (PARTITION BY f ORDER BY c DESC) as rank_num
> |  FROM  T1, T2
> |  WHERE T1.a = T2.d
> |)
> |WHERE rank_num = 1
>   """.stripMargin
> util.verifyPlan(sql){code}
> The plan is expected to be:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], 
> select=[a, c, f])
>+- Exchange(distribution=[hash[f]])
>   +- Calc(select=[a, c, f])
>  +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, c, d, f], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[a]])
> :  +- Calc(select=[a, c])
> : +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +- Exchange(distribution=[hash[d]])
>+- Calc(select=[d, f])
>   +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) 
> {code}
> Notice that the 'select' of Join operator is [a, c, d, f]. However the actual 
> plan is:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], 
> select=[a, c, f])
>+- Exchange(distribution=[hash[f]])
>   +- Calc(select=[a, c, f])
>  +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, 
> e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[a]])
> :  +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +- Exchange(distribution=[hash[d]])
>+- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
>  {code}
> the 'select' of Join operator is [a, b, c, d, e, f], which means the 
> projection in the final Calc is not passed through the Rank.
> And I think an easy way to fix this issue is to add 
> org.apache.calcite.rel.rules.ProjectWindowTransposeRule into 
> FlinkStreamRuleSets.LOGICAL_OPT_RULES.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34555) Migrate JoinConditionTypeCoerceRule

2024-02-29 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34555:
---
Labels: pull-request-available  (was: )

> Migrate JoinConditionTypeCoerceRule
> ---
>
> Key: FLINK-34555
> URL: https://issues.apache.org/jira/browse/FLINK-34555
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0
>Reporter: Jacky Lau
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34555][table] Migrate JoinConditionTypeCoerceRule to java. [flink]

2024-02-29 Thread via GitHub


flinkbot commented on PR #24420:
URL: https://github.com/apache/flink/pull/24420#issuecomment-1972439916

   
   ## CI report:
   
   * 0183b6bb862ceaf1ef3ff96896ac5d673ba59493 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Join condition type coerce rule [flink]

2024-02-29 Thread via GitHub


liuyongvs opened a new pull request, #24420:
URL: https://github.com/apache/flink/pull/24420

   ## What is the purpose of the change
   
   The PR migrates JoinConditionTypeCoerceRule to java
   it doesn't touch JoinConditionTypeCoerceRuleTest to be sure that java 
version continues passing it
   
   ## Verifying this change
   
   This change is already covered by existing tests
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-34555) Migrate JoinConditionTypeCoerceRule

2024-02-29 Thread Jacky Lau (Jira)
Jacky Lau created FLINK-34555:
-

 Summary: Migrate JoinConditionTypeCoerceRule
 Key: FLINK-34555
 URL: https://issues.apache.org/jira/browse/FLINK-34555
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Affects Versions: 1.20.0
Reporter: Jacky Lau
 Fix For: 1.20.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [BP-1.18][FLINK-34517][table]fix environment configs ignored when calling procedure operation [flink]

2024-02-29 Thread via GitHub


flinkbot commented on PR #24419:
URL: https://github.com/apache/flink/pull/24419#issuecomment-1972391441

   
   ## CI report:
   
   * b514f6bb29bd04049073b0762eacbce63d85e5ba UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34517][table]fix environment configs ignored when calling procedure operation [flink]

2024-02-29 Thread via GitHub


JustinLeesin commented on PR #24397:
URL: https://github.com/apache/flink/pull/24397#issuecomment-1972389996

   > @JustinLeesin Thanks for update. LGTM. I'll merge to master after 1.19 
branch cut
   
   @luoyuxia OK, Thank you for taking the time to review my code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-16627) Support only generate non-null values when serializing into JSON

2024-02-29 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822358#comment-17822358
 ] 

Benchao Li commented on FLINK-16627:


[~nilerzhou] Thank you for the interest of contributing to Flink, I assigned 
this to you~

> Support only generate non-null values when serializing into JSON
> 
>
> Key: FLINK-16627
> URL: https://issues.apache.org/jira/browse/FLINK-16627
> Project: Flink
>  Issue Type: New Feature
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Planner
>Affects Versions: 1.10.0
>Reporter: jackray wang
>Assignee: yisha zhou
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> auto-unassigned, pull-request-available, sprint
>
> {code:java}
> //sql
> CREATE TABLE sink_kafka ( subtype STRING , svt STRING ) WITH (……)
> {code}
>  
> {code:java}
> //sql
> CREATE TABLE source_kafka ( subtype STRING , svt STRING ) WITH (……)
> {code}
>  
> {code:java}
> //scala udf
> class ScalaUpper extends ScalarFunction {
> def eval(str: String) : String= { 
>if(str == null){
>return ""
>}else{
>return str
>}
> }
> 
> }
> btenv.registerFunction("scala_upper", new ScalaUpper())
> {code}
>  
> {code:java}
> //sql
> insert into sink_kafka select subtype, scala_upper(svt)  from source_kafka
> {code}
>  
>  
> 
> Sometimes the svt's value is null, inert into kafkas json like  
> \{"subtype":"qin","svt":null}
> If the amount of data is small, it is acceptable,but we process 10TB of data 
> every day, and there may be many nulls in the json, which affects the 
> efficiency. If you can add a parameter to remove the null key when defining a 
> sinktable, the performance will be greatly improved
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-16627) Support only generate non-null values when serializing into JSON

2024-02-29 Thread Benchao Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Benchao Li reassigned FLINK-16627:
--

Assignee: yisha zhou

> Support only generate non-null values when serializing into JSON
> 
>
> Key: FLINK-16627
> URL: https://issues.apache.org/jira/browse/FLINK-16627
> Project: Flink
>  Issue Type: New Feature
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Planner
>Affects Versions: 1.10.0
>Reporter: jackray wang
>Assignee: yisha zhou
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> auto-unassigned, pull-request-available, sprint
>
> {code:java}
> //sql
> CREATE TABLE sink_kafka ( subtype STRING , svt STRING ) WITH (……)
> {code}
>  
> {code:java}
> //sql
> CREATE TABLE source_kafka ( subtype STRING , svt STRING ) WITH (……)
> {code}
>  
> {code:java}
> //scala udf
> class ScalaUpper extends ScalarFunction {
> def eval(str: String) : String= { 
>if(str == null){
>return ""
>}else{
>return str
>}
> }
> 
> }
> btenv.registerFunction("scala_upper", new ScalaUpper())
> {code}
>  
> {code:java}
> //sql
> insert into sink_kafka select subtype, scala_upper(svt)  from source_kafka
> {code}
>  
>  
> 
> Sometimes the svt's value is null, inert into kafkas json like  
> \{"subtype":"qin","svt":null}
> If the amount of data is small, it is acceptable,but we process 10TB of data 
> every day, and there may be many nulls in the json, which affects the 
> efficiency. If you can add a parameter to remove the null key when defining a 
> sinktable, the performance will be greatly improved
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34536) Support reading long value as Timestamp column in JSON format

2024-02-29 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822357#comment-17822357
 ] 

Benchao Li commented on FLINK-34536:


It sounds like a useful feature from the user perspective.

Usually a numeric representation is a unixtime, and it would much like 
{{cast(numeric as timstamp[_ltz])}}, which has been discussed in 
[FLIP-162|https://cwiki.apache.org/confluence/display/FLINK/FLIP-162%3A+Consistent+Flink+SQL+time+function+behavior#FLIP162:ConsistentFlinkSQLtimefunctionbehavior-2.DisableCASTbetweenNUMERICandTIMESTAMP].
 Hence it should be {{TIMSTAMP_LTZ}} instead of {{TIMESTAMP}} that is allowed 
to be converted from numeric.

Besides, the precision of unixtime has multiple choices, 
second/millisecond/microsecond, so how would you suppose to distinguish them 
when converting it to {{TIMSTAMP_LTZ}}.

For now, users can deal with the conversion in SQL expression via 
{{to_timestamp_ltz}} as an alternative, so I don't have a strong opinion 
whether to introduce a feature in format level.

Anyway, I think this deserves a FLIP since JSON format is a very fundamental 
format and this would be a public API change.

> Support reading long value as Timestamp column in JSON format
> -
>
> Key: FLINK-34536
> URL: https://issues.apache.org/jira/browse/FLINK-34536
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.19.0
>Reporter: yisha zhou
>Priority: Major
>
> In many scenarios, timestamp data is stored as Long value and expected to be 
> operated as Timestamp value. It's not user-friendly to use an UDF to convert 
> the data before operating it.
> Meanwhile, in Avro format, it seems it can receive several types of input and 
> convert it into TimestampData. Hope the same ability can be introduced into 
> JSON format.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34517][table]fix environment configs ignored when calling procedure operation [flink]

2024-02-29 Thread via GitHub


JustinLeesin commented on PR #24397:
URL: https://github.com/apache/flink/pull/24397#issuecomment-1972385924

   > @JustinLeesin Could you please cherry pick it to release-1.18. I'll merge 
to 1.18 firstly.
   @luoyuxia Done , [24414](https://github.com/apache/flink/pull/24419)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [BP-1.18][FLINK-34517][table]fix environment configs ignored when calling procedure operation [flink]

2024-02-29 Thread via GitHub


JustinLeesin opened a new pull request, #24419:
URL: https://github.com/apache/flink/pull/24419

   1.18 backport for parent PR 
[24397](https://github.com/apache/flink/pull/24397) 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-34521) Using the Duration instead of the deprecated Time classes

2024-02-29 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34521?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan closed FLINK-34521.
---
Resolution: Duplicate

> Using the Duration instead of the deprecated Time classes
> -
>
> Key: FLINK-34521
> URL: https://issues.apache.org/jira/browse/FLINK-34521
> Project: Flink
>  Issue Type: Technical Debt
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
> Fix For: 1.20.0
>
>
> FLINK-32570 deprecated org.apache.flink.api.common.time.Time and 
> org.apache.flink.streaming.api.windowing.time.Time.
> We should refactor all internal callers from Time to Duration. (Public 
> callers should be removed in 2.0).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34521) Using the Duration instead of the deprecated Time classes

2024-02-29 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822350#comment-17822350
 ] 

Rui Fan commented on FLINK-34521:
-

Thanks for the reminder, i close this jira.

> Using the Duration instead of the deprecated Time classes
> -
>
> Key: FLINK-34521
> URL: https://issues.apache.org/jira/browse/FLINK-34521
> Project: Flink
>  Issue Type: Technical Debt
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
> Fix For: 1.20.0
>
>
> FLINK-32570 deprecated org.apache.flink.api.common.time.Time and 
> org.apache.flink.streaming.api.windowing.time.Time.
> We should refactor all internal callers from Time to Duration. (Public 
> callers should be removed in 2.0).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32076][checkpoint] Introduce file pool to reuse files [flink]

2024-02-29 Thread via GitHub


flinkbot commented on PR #24418:
URL: https://github.com/apache/flink/pull/24418#issuecomment-1972349443

   
   ## CI report:
   
   * 61e97462d4566a1ae8f17ceb32e42f68f92631a6 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32076) Add file pool for concurrent file reusing

2024-02-29 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32076:
---
Labels: pull-request-available  (was: )

> Add file pool for concurrent file reusing
> -
>
> Key: FLINK-32076
> URL: https://issues.apache.org/jira/browse/FLINK-32076
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Zakelly Lan
>Assignee: Hangxiang Yu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-32076][checkpoint] Introduce file pool to reuse files [flink]

2024-02-29 Thread via GitHub


masteryhx opened a new pull request, #24418:
URL: https://github.com/apache/flink/pull/24418

   
   
   ## What is the purpose of the change
   
   Add file pool for concurrent file reusing
   
   
   ## Brief change log
   
 - Introduce PhysicalFilePool and use it in FileMergingSnapshotManager
 - Support different type of PhysicalFilePool
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *Added testConcurrentFileReusingWithNonBlockingPool and 
testConcurrentFileReusingWithBlockingPool* 
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), **Checkpointing**, Kubernetes/Yarn, ZooKeeper: (yes / no / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33989) Insert Statement With Filter Operation Generates Extra Tombstone using Upsert Kafka Connector

2024-02-29 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822337#comment-17822337
 ] 

xuyang commented on FLINK-33989:


Agree with [~libenchao] . This is a behavior by design.

> Insert Statement With Filter Operation Generates Extra Tombstone using Upsert 
> Kafka Connector
> -
>
> Key: FLINK-33989
> URL: https://issues.apache.org/jira/browse/FLINK-33989
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Table SQL / Runtime
>Affects Versions: 1.17.2
>Reporter: Flaviu Cicio
>Priority: Major
>
> Given the following Flink SQL tables:
> {code:sql}
> CREATE TABLE input (
>   id STRING NOT NULL, 
>   current_value STRING NOT NULL, 
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka', 
>   'topic' = 'input', 
>   'key.format' = 'raw', 
>   'properties.bootstrap.servers' = 'kafka:29092', 
>   'properties.group.id' = 'your_group_id', 
>   'value.format' = 'json'
> );
> CREATE TABLE output (
>   id STRING NOT NULL, 
>   current_value STRING NOT NULL, 
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka', 
>   'topic' = 'output', 
>   'key.format' = 'raw', 
>   'properties.bootstrap.servers' = 'kafka:29092', 
>   'properties.group.id' = 'your_group_id', 
>   'value.format' = 'json'
> ); {code}
> And, the following entries are present in the input Kafka topic:
> {code:json}
> [
>   {
>     "id": "1",
>     "current_value": "abc"
>   },
>   {
>     "id": "1",
>     "current_value": "abcd"
>   }
> ]{code}
> If we execute the following statement:
> {code:sql}
> INSERT INTO output SELECT id, current_value FROM input; {code}
> The following entries are published to the output Kafka topic:
> {code:json}
> [
>   {
>     "id": "1",
>     "current_value": "abc"
>   },
>   {
>     "id": "1",
>     "current_value": "abcd"
>   }
> ]{code}
> But, if we execute the following statement:
> {code:sql}
> INSERT INTO output SELECT id, current_value FROM input WHERE id IN ('1'); 
> {code}
> The following entries are published:
> {code:json}
> [
>   {
>     "id": "1",
>     "current_value": "abc"
>   },
>   null,
>   {
>     "id": "1",
>     "current_value": "abcd"
>   }
> ]{code}
> We would expect the result to be the same for both insert statements.
> As we can see, there is an extra tombstone generated as a result of the 
> second statement.
>  
> Moreover, if we make a select on the input table:
> {code:sql}
> SELECT * FROM input;
> {code}
> We will get the following entries:
> ||op||id||current_value||
> |I|1|abc|
> |-U|1|abc|
> |+U|1|abcd|
> We expected to see only the insert and the update_after entries.
> The update_before is added at DeduplicateFunctionHelper#122.
> This is easily reproducible with this test that we added in the 
> UpsertKafkaTableITCase from flink-connector-kafka:
> {code:java}
> @Test
> public void testAggregateFilterOmit() throws Exception {
> String topic = COUNT_FILTER_TOPIC + "_" + format;
> createTestTopic(topic, 1, 1);
> env.setParallelism(1);
> // -   test   ---
> countFilterToUpsertKafkaOmitUpdateBefore(topic);
> // - clean up ---
> deleteTestTopic(topic);
> }
> private void countFilterToUpsertKafkaOmitUpdateBefore(String table) 
> throws Exception {
> String bootstraps = getBootstrapServers();
> List data =
> Arrays.asList(
> Row.of(1, "Hi"),
> Row.of(1, "Hello"),
> Row.of(2, "Hello world"),
> Row.of(2, "Hello world, how are you?"),
> Row.of(2, "I am fine."),
> Row.of(3, "Luke Skywalker"),
> Row.of(3, "Comment#1"),
> Row.of(3, "Comment#2"),
> Row.of(4, "Comment#3"),
> Row.of(4, null));
> final String createSource =
> String.format(
> "CREATE TABLE aggfilter_%s ("
> + "  `id` INT,\n"
> + "  `comment` STRING\n"
> + ") WITH ("
> + "  'connector' = 'values',"
> + "  'data-id' = '%s'"
> + ")",
> format, TestValuesTableFactory.registerData(data));
> tEnv.executeSql(createSource);
> final String createSinkTable =
> String.format(
> "CREATE TABLE %s (\n"
> + "  `id` INT,\n"
> + "  `comment` 

Re: [PR] [FLINK-34522][core] Changing the Time to Duration for StateTtlConfig.Builder.cleanupInRocksdbCompactFilter [flink]

2024-02-29 Thread via GitHub


flinkbot commented on PR #24417:
URL: https://github.com/apache/flink/pull/24417#issuecomment-1972320798

   
   ## CI report:
   
   * c116a9c297479ebddb59864bda83ec8dfd43bb4b UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-34522][core] Changing the Time to Duration for StateTtlConfig.Builder.cleanupInRocksdbCompactFilter [flink]

2024-02-29 Thread via GitHub


RocMarshal opened a new pull request, #24417:
URL: https://github.com/apache/flink/pull/24417

   
   
   - What is the purpose of the change
   [FLINK-32570](https://issues.apache.org/jira/browse/FLINK-32570) deprecated 
the Time class and refactor all Public or PublicEvolving apis to use the Java's 
Duration.
   
   StateTtlConfig.Builder#cleanupInRocksdbCompactFilter is still using the Time 
class. In general, we expect:
   
   Mark cleanupInRocksdbCompactFilter(long, Time) as @Deprecated
   Provide a new cleanupInRocksdbCompactFilter(long, Duration)
   But I found this method is introduced in 1.19 
([FLINK-30854](https://issues.apache.org/jira/browse/FLINK-30854)), so a better 
solution may be: only provide cleanupInRocksdbCompactFilter(long, Duration) and 
don't use Time.
   
   The deprecated Api should be keep for 2 minor version. IIUC, we cannot 
remove Time related class in Flink 2.0 if we don't deprecate it in 1.19. If so, 
I think it's better to merge this JIRA in 1.19.0 as well.
   
   - Brief change log
   [[FLINK-34522](https://issues.apache.org/jira/browse/FLINK-34522)][core] 
Changing the Time to Duration for 
StateTtlConfig.Builder.cleanupInRocksdbCompactFilter
   
   
   
   
   Fix based on https://github.com/apache/flink/pull/24388   & 
https://github.com/apache/flink/pull/24387#issuecomment-1971427877


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34550) attempted task still report metric of currentEmitEventTimeLag

2024-02-29 Thread siwei.gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

siwei.gao updated FLINK-34550:
--
Attachment: image-2024-03-01-09-51-08-909.png

> attempted task still report metric of currentEmitEventTimeLag
> -
>
> Key: FLINK-34550
> URL: https://issues.apache.org/jira/browse/FLINK-34550
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
> Environment: flink version:1.17.1  
> kafka-connector:1.17.1
>Reporter: siwei.gao
>Priority: Major
>  Labels: streamsource
> Attachments: image-2024-02-29-21-41-01-709.png, 
> image-2024-02-29-21-43-18-340.png, image-2024-02-29-21-50-55-160.png, 
> image-2024-03-01-09-51-08-909.png
>
>
> Attempted task still report metric of currentEmitEventTimeLag when use 
> kafka-connector.Attempt_num for reporting indicators of multiple tasks with 
> the same subtask_index but different task_attempt_num times within the same 
> time period.  !image-2024-02-29-21-43-18-340.png|width=990,height=237!
> Only the metric which  tash_attempt_num is 4 should be reported normally.
> This condition shows in taskmanager with multiple slots and it's ok when 
> taskmanager only has one slot.
> !image-2024-02-29-21-50-55-160.png|width=973,height=730!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34550) attempted task still report metric of currentEmitEventTimeLag

2024-02-29 Thread siwei.gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

siwei.gao updated FLINK-34550:
--
Description: 
Attempted task still report metric of currentEmitEventTimeLag when use 
kafka-connector.Attempt_num for reporting indicators of multiple tasks with the 
same subtask_index but different task_attempt_num times within the same time 
period.  !image-2024-03-01-09-51-08-909.png|width=992,height=242!

Only the metric which  tash_attempt_num is 4 should be reported normally.

This condition shows in taskmanager with multiple slots and it's ok when 
taskmanager only has one slot.

!image-2024-02-29-21-50-55-160.png|width=973,height=730!

 

  was:
Attempted task still report metric of currentEmitEventTimeLag when use 
kafka-connector.Attempt_num for reporting indicators of multiple tasks with the 
same subtask_index but different task_attempt_num times within the same time 
period.  !image-2024-02-29-21-43-18-340.png|width=990,height=237!

Only the metric which  tash_attempt_num is 4 should be reported normally.

This condition shows in taskmanager with multiple slots and it's ok when 
taskmanager only has one slot.

!image-2024-02-29-21-50-55-160.png|width=973,height=730!

 


> attempted task still report metric of currentEmitEventTimeLag
> -
>
> Key: FLINK-34550
> URL: https://issues.apache.org/jira/browse/FLINK-34550
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
> Environment: flink version:1.17.1  
> kafka-connector:1.17.1
>Reporter: siwei.gao
>Priority: Major
>  Labels: streamsource
> Attachments: image-2024-02-29-21-41-01-709.png, 
> image-2024-02-29-21-43-18-340.png, image-2024-02-29-21-50-55-160.png, 
> image-2024-03-01-09-51-08-909.png
>
>
> Attempted task still report metric of currentEmitEventTimeLag when use 
> kafka-connector.Attempt_num for reporting indicators of multiple tasks with 
> the same subtask_index but different task_attempt_num times within the same 
> time period.  !image-2024-03-01-09-51-08-909.png|width=992,height=242!
> Only the metric which  tash_attempt_num is 4 should be reported normally.
> This condition shows in taskmanager with multiple slots and it's ok when 
> taskmanager only has one slot.
> !image-2024-02-29-21-50-55-160.png|width=973,height=730!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34550) attempted task still report metric of currentEmitEventTimeLag

2024-02-29 Thread siwei.gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

siwei.gao updated FLINK-34550:
--
Description: 
Attempted task still report metric of currentEmitEventTimeLag when use 
kafka-connector.Attempt_num for reporting indicators of multiple tasks with the 
same subtask_index but different task_attempt_num times within the same time 
period.  !image-2024-02-29-21-43-18-340.png|width=990,height=237!

Only the metric which  tash_attempt_num is 4 should be reported normally.

This condition shows in taskmanager with multiple slots and it's ok when 
taskmanager only has one slot.

!image-2024-02-29-21-50-55-160.png|width=973,height=730!

 

  was:
Attempted task still report metric of currentEmitEventTimeLag when use 
kafka-connector.Attempt_num for reporting indicators of multiple tasks with the 
same subtask_index but different task_attempt_num times within the same time 
period. !image-2024-02-29-21-43-18-340.png!

Only the metric which  tash_attempt_num is 4 should be reported normally.

This condition shows in taskmanager with multiple slots and it's ok when 
taskmanager only has one slot.

!image-2024-02-29-21-50-55-160.png!

 


> attempted task still report metric of currentEmitEventTimeLag
> -
>
> Key: FLINK-34550
> URL: https://issues.apache.org/jira/browse/FLINK-34550
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
> Environment: flink version:1.17.1  
> kafka-connector:1.17.1
>Reporter: siwei.gao
>Priority: Major
>  Labels: streamsource
> Attachments: image-2024-02-29-21-41-01-709.png, 
> image-2024-02-29-21-43-18-340.png, image-2024-02-29-21-50-55-160.png
>
>
> Attempted task still report metric of currentEmitEventTimeLag when use 
> kafka-connector.Attempt_num for reporting indicators of multiple tasks with 
> the same subtask_index but different task_attempt_num times within the same 
> time period.  !image-2024-02-29-21-43-18-340.png|width=990,height=237!
> Only the metric which  tash_attempt_num is 4 should be reported normally.
> This condition shows in taskmanager with multiple slots and it's ok when 
> taskmanager only has one slot.
> !image-2024-02-29-21-50-55-160.png|width=973,height=730!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34553) Time travel support by Flink catalogs

2024-02-29 Thread Feng Jin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34553?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822332#comment-17822332
 ] 

Feng Jin commented on FLINK-34553:
--

>From what I currently know,only Paimon supports Flink's timetravel. For more 
>details, please refer to the documentation: 

https://paimon.apache.org/docs/master/how-to/querying-tables/#batch-time-travel


And the source code: 
https://github.com/apache/incubator-paimon/blob/master/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java

> Time travel support by Flink catalogs
> -
>
> Key: FLINK-34553
> URL: https://issues.apache.org/jira/browse/FLINK-34553
> Project: Flink
>  Issue Type: Technical Debt
>Reporter: Mehmet Aktas
>Priority: Major
>
> I am trying to add time travel support for the Flink backend in 
> [Ibis|https://github.com/ibis-project/ibis].
> I found that Flink requires the {{catalog}} to implement 
> {{getTable(ObjectPath tablePath, long timestamp)}} for time travel support:
> Attention: Currently, time travel requires the corresponding catalog that the 
> table belongs to implementing the getTable(ObjectPath tablePath, long 
> timestamp) method. See more details in Catalog.
> [[https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/time-travel/]]
> The default {{GenericInMemoryCatalog}} does not seem to implement 
> {{getTable()}} . I set up a {{hive metastore}} and created a {{{}hive 
> catalog{}}}, but it turns out that, hive catalog also does not implement 
> {{getTable()}} --  I wish Flink docs were more detailed about these ...
> py4j.protocol.Py4JJavaError: An error occurred while calling o8.sqlQuery. : 
> org.apache.flink.table.api.ValidationException: SQL validation failed. 
> getTable(ObjectPath, long) is not implemented for class 
> org.apache.flink.table.catalog.hive.HiveCatalog.
> I have two options now to continue with this: * Try another catalog, like 
> Iceberg catalog, to see if that one implements {{getTable()}}
>  ** {{{}{}}}I am not able to find information on whether a given catalog 
> implements this function. Should I dig into their source code to figure this 
> out, or is there any other way to find out?
>  * Implement a custom catalog with {{{}getTable(){}}}, following the 
> *very-brief-instructions* given in the [Flink 
> doc|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#interface-in-catalog-for-supporting-time-travel].
> Is there a doc, article, mailing list or anything else that I can use to get 
> more information on
>  * Which catalogs implement time travel support?
>  * If there is not readily available catalog supporting time travel, then how 
> can we implement a custom catalog with time travel support?
>  ** The instructions given 
> [here|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#interface-in-catalog-for-supporting-time-travel]
>  are unfortunately not sufficient for a Java-illiterate person like myself.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34517][table]fix environment configs ignored when calling procedure operation [flink]

2024-02-29 Thread via GitHub


luoyuxia commented on PR #24397:
URL: https://github.com/apache/flink/pull/24397#issuecomment-1972271813

   @JustinLeesin Could you please cherry pick it to release-1.18. I'll merge to 
1.18 firstly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34542][docs] improved example quickstart build.gradle [flink]

2024-02-29 Thread via GitHub


flinkbot commented on PR #24416:
URL: https://github.com/apache/flink/pull/24416#issuecomment-1972195868

   
   ## CI report:
   
   * 0d64603764567494fefd05b5847a60795af949fd UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34542) Improve Gradle Quick Start build.gradle with Better Gradle API Alternatives

2024-02-29 Thread Lennon Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lennon Yu updated FLINK-34542:
--
Description: 
This is a ticket of misc. improvements on the build.gradle script provided at 
{{Application Development}} >> {{Project Configuration}} >> {{Overview}} >> 
{{Getting Started:}}
 * Add {{mergeServiceFiles()}} call to the {{shadowJar}} configuration block
 ** Absence of this will cause class-not-found errors in SPI related class 
loading if the user has multiple connectors/formats in their implementation.
 * Move the top level {{mainClassName}} project property setting into 
application \{ mainClass = 'foo.Bar' }
 ** This is because the top-level mainClassName property will be deprecated in 
Gradle 9.0+
 * Replace the use of {{sourceCompatibility}} and {{targetCompatibility}} 
properties with java \{ toolChain \{ languageVersion = 
JavaLanguageVersion.of(11) \} \}
 ** This is the recommended way by Gradle to streamline langauge version 
configuration.
 ** Also the original configured Java version - Java 8 - is getting close to 
its terminal support phase, and it's better to move on to Java 11.

  was:
This is a ticket of misc. improvements on the build.gradle script provided at 
{{Application Development}} >> {{Project Configuration}} >> {{Overview}} >> 
{{Getting Started:}}
 * Add {{mergeServiceFiles()}} call to the {{shadowJar}} configuration block
 ** Absence of this will cause class-not-found errors in SPI related class 
loading if the user has multiple connectors/formats in their implementation.
 * Move the top level {{mainClassName}} project property setting into 
application \{ mainClass = 'foo.Bar' }
 ** This is because the top-level mainClassName property will be deprecated in 
Gradle 9.0+
 * Replace the use of {{sourceCompatibility}} and {{targetCompatibility}} 
properties with java \{ toolChain { languageVersion = 
JavaLanguageVersion.of(11) } }
 ** This is the recommended way by Gradle to streamline langauge version 
configuration.
 ** Also the original configured Java version - Java 8 - is getting close to 
its terminal support phase, and it's better to move on to Java 11.


> Improve Gradle Quick Start build.gradle with Better Gradle API Alternatives
> ---
>
> Key: FLINK-34542
> URL: https://issues.apache.org/jira/browse/FLINK-34542
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Lennon Yu
>Priority: Minor
>  Labels: pull-request-available
>
> This is a ticket of misc. improvements on the build.gradle script provided at 
> {{Application Development}} >> {{Project Configuration}} >> {{Overview}} >> 
> {{Getting Started:}}
>  * Add {{mergeServiceFiles()}} call to the {{shadowJar}} configuration block
>  ** Absence of this will cause class-not-found errors in SPI related class 
> loading if the user has multiple connectors/formats in their implementation.
>  * Move the top level {{mainClassName}} project property setting into 
> application \{ mainClass = 'foo.Bar' }
>  ** This is because the top-level mainClassName property will be deprecated 
> in Gradle 9.0+
>  * Replace the use of {{sourceCompatibility}} and {{targetCompatibility}} 
> properties with java \{ toolChain \{ languageVersion = 
> JavaLanguageVersion.of(11) \} \}
>  ** This is the recommended way by Gradle to streamline langauge version 
> configuration.
>  ** Also the original configured Java version - Java 8 - is getting close to 
> its terminal support phase, and it's better to move on to Java 11.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34542) Improve Gradle Quick Start build.gradle with Better Gradle API Alternatives

2024-02-29 Thread Lennon Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lennon Yu updated FLINK-34542:
--
Description: 
This is a ticket of misc. improvements on the build.gradle script provided at 
{{Application Development}} >> {{Project Configuration}} >> {{Overview}} >> 
{{Getting Started:}}
 * Add {{mergeServiceFiles()}} call to the {{shadowJar}} configuration block
 ** Absence of this will cause class-not-found errors in SPI related class 
loading if the user has multiple connectors/formats in their implementation.
 * Move the top level {{mainClassName}} project property setting into 
application \{ mainClass = 'foo.Bar' }
 ** This is because the top-level mainClassName property will be deprecated in 
Gradle 9.0+
 * Replace the use of {{sourceCompatibility}} and {{targetCompatibility}} 
properties with java \{ toolChain { languageVersion = 
JavaLanguageVersion.of(11) } }
 ** This is the recommended way by Gradle to streamline langauge version 
configuration.
 ** Also the original configured Java version - Java 8 - is getting close to 
its terminal support phase, and it's better to move on to Java 11.

  was:
This is a ticket of misc. improvements on the build.gradle script provided at 
{{Application Development}} >> {{Project Configuration}} >> {{Overview}} >> 
{{Getting Started:}}
 * Add {{mergeServiceFiles()}} call to the {{shadowJar}} configuration block
 ** Absence of this will cause class-not-found errors in SPI related class 
loading if the user has multiple connectors/formats in their implementation.
 * Move the top level {{mainClassName}} project property setting into 
application \{ mainClass = 'foo.Bar' }
 ** This is because the top-level mainClassName property will be deprecated in 
Gradle 9.0+
 * Replace the use of {{sourceCompatibility}} and {{targetCompatibility}} 
properties with java \{ toolChain \{ languageVersion = 
JavaLanguageVersion.of(17) } }
 ** This is the recommended way by Gradle to streamline langauge version 
configuration.


> Improve Gradle Quick Start build.gradle with Better Gradle API Alternatives
> ---
>
> Key: FLINK-34542
> URL: https://issues.apache.org/jira/browse/FLINK-34542
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Lennon Yu
>Priority: Minor
>  Labels: pull-request-available
>
> This is a ticket of misc. improvements on the build.gradle script provided at 
> {{Application Development}} >> {{Project Configuration}} >> {{Overview}} >> 
> {{Getting Started:}}
>  * Add {{mergeServiceFiles()}} call to the {{shadowJar}} configuration block
>  ** Absence of this will cause class-not-found errors in SPI related class 
> loading if the user has multiple connectors/formats in their implementation.
>  * Move the top level {{mainClassName}} project property setting into 
> application \{ mainClass = 'foo.Bar' }
>  ** This is because the top-level mainClassName property will be deprecated 
> in Gradle 9.0+
>  * Replace the use of {{sourceCompatibility}} and {{targetCompatibility}} 
> properties with java \{ toolChain { languageVersion = 
> JavaLanguageVersion.of(11) } }
>  ** This is the recommended way by Gradle to streamline langauge version 
> configuration.
>  ** Also the original configured Java version - Java 8 - is getting close to 
> its terminal support phase, and it's better to move on to Java 11.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34542) Improve Gradle Quick Start build.gradle with Better Gradle API Alternatives

2024-02-29 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34542:
---
Labels: pull-request-available  (was: )

> Improve Gradle Quick Start build.gradle with Better Gradle API Alternatives
> ---
>
> Key: FLINK-34542
> URL: https://issues.apache.org/jira/browse/FLINK-34542
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Lennon Yu
>Priority: Minor
>  Labels: pull-request-available
>
> This is a ticket of misc. improvements on the build.gradle script provided at 
> {{Application Development}} >> {{Project Configuration}} >> {{Overview}} >> 
> {{Getting Started:}}
>  * Add {{mergeServiceFiles()}} call to the {{shadowJar}} configuration block
>  ** Absence of this will cause class-not-found errors in SPI related class 
> loading if the user has multiple connectors/formats in their implementation.
>  * Move the top level {{mainClassName}} project property setting into 
> application \{ mainClass = 'foo.Bar' }
>  ** This is because the top-level mainClassName property will be deprecated 
> in Gradle 9.0+
>  * Replace the use of {{sourceCompatibility}} and {{targetCompatibility}} 
> properties with java \{ toolChain \{ languageVersion = 
> JavaLanguageVersion.of(17) } }
>  ** This is the recommended way by Gradle to streamline langauge version 
> configuration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-34542][docs] improved example quickstart build.gradle [flink]

2024-02-29 Thread via GitHub


LemonU opened a new pull request, #24416:
URL: https://github.com/apache/flink/pull/24416

   
   
   ## What is the purpose of the change
   
   To improve the `build.gradle` build script provided in the Flink 
documentation under section `Application Development >> Project Configuration 
>> Overview >> Getting Started`.
   
   ## Brief change log
   
   - Replaced some project setup calls that are about to be deprecated in newer 
Gradle versions.
   - Enabled service file merging in the `shadowJar ` extension to allow having 
multiple connector/format jars on the project classpath.
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34535] Support JobPlanInfo for the explain result [flink]

2024-02-29 Thread via GitHub


jeyhunkarimov commented on code in PR #24411:
URL: https://github.com/apache/flink/pull/24411#discussion_r1508294826


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala:
##
@@ -1824,6 +1826,11 @@ object TableTestUtil {
 s.replaceAll("\"id\"\\s*:\\s*\\d+", "\"id\" : ").trim
   }
 
+  /** JobJson {jid} is ignored. */
+  def replaceJobId(s: String): String = {
+s.replaceAll("\"jid\"\\s*:\\s*\"[\\w.-]*\"", "\"jid\":\"\"").trim

Review Comment:
   This might also replace a string literal having `"jid..."` pattern. We 
should make sure that only field values are targeted in this case



##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala:
##
@@ -1093,6 +1093,8 @@ abstract class TableTestUtilBase(test: TableTestBase, 
isStreamingMode: Boolean)
 case ExplainDetail.ESTIMATED_COST => replaceEstimatedCost(result)
 case ExplainDetail.JSON_EXECUTION_PLAN =>
   replaceNodeIdInOperator(replaceStreamNodeId(replaceStageId(result)))
+case ExplainDetail.JSON_JOB_PLAN =>

Review Comment:
   I would leave the old test as it is, and add your cases in another test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34491) Move from experimental support to production support for Java 17

2024-02-29 Thread Dhruv Patel (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruv Patel updated FLINK-34491:

Description: 
This task is to move away from experimental support for Java 17 to production 
support so that teams running Flink in production can migrate to Java 17 
successfully

*Background:*

Flink supports protobuf dataformat to exchange messages between different 
operators and the serialization and deserialization of these protobufs messages 
are performed by library called "Kryo". In order to move away from experimental 
support of Java 17 released as part of Flink 1.18.1, the Kryo library in Flink 
1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 does not 
support Java 17. This improvement plan is tracked as part of this ticket 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-317%3A+Upgrade+Kryo+from+2.24.0+to+5.5.0]

All Flink applications using protobuf currently generate state with Kryo v2. 
Once the above improvement plan is complete all Flink applications will fully 
support reading that state and write newer state with Kryo v5. However, latest 
Kryo v5 doesn't support snapshots made by a previous Kryo v2. This will prevent 
applications which are using snapshot mechanism to deploy their jobs to latest 
Flink version with Kryo v5 support without a bridge version running on Java 11. 
Applications will have to run on a bridge release version that will read their 
state with Kryo v2 data and write it with Kryo v5 data before upgrading to a 
future version of Flink that completely drops support for Kryo v2.

Basically, Flink applications using protobuf dataformat cannot move directly 
from Java 8 to Java 17 without downtime after the kryo v5 release in Flink. 
Applications will need to first move to Java 11 (bridging version) and then 
move to Java 17 to have a safe deployment.

*Blocker for this task:*

Upgrade to Kryo 5.5.0 which supports Java 17 and a path for snapshot migration
https://issues.apache.org/jira/browse/FLINK-3154.

  was:
This task is to move away from experimental support for Java 17 to production 
support so that teams running Flink in production can migrate to Java 17 
successfully

*Background:*

Flink supports protobuf dataformat to exchange messages between different 
operators and the serialization and deserialization of those protobufs are 
performed by library called "Kryo". In order to move away from experimental 
support of Java 17 released as part of Flink 1.18.1, the Kryo library in Flink 
1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 does not 
support Java 17. This improvement plan is tracked as part of this ticket 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-317%3A+Upgrade+Kryo+from+2.24.0+to+5.5.0]

All Flink applications using protobuf currently generate state with Kryo v2. 
Once the above improvement plan is complete all Flink applications will fully 
support reading that state and write newer state with Kryo v5. However, latest 
Kryo v5 doesn't support snapshots made by a previous Kryo v2. This will prevent 
applications which are using snapshot mechanism to deploy their jobs to latest 
Flink version with Kryo v5 support without a bridge version running on Java 11. 
Applications will have to run on a bridge release version that will read their 
state with Kryo v2 data and write it with Kryo v5 data before upgrading to a 
future version of Flink that completely drops support for Kryo v2.

Basically, Flink applications using protobuf dataformat cannot move directly 
from Java 8 to Java 17 without downtime after the kryo v5 release in Flink. 
Applications will need to first move to Java 11 (bridging version) and then 
move to Java 17 to have a safe deployment.

*Blocker for this task:*

Upgrade to Kryo 5.5.0 which supports Java 17 and a path for snapshot migration
https://issues.apache.org/jira/browse/FLINK-3154.


> Move from experimental support to production support for Java 17
> 
>
> Key: FLINK-34491
> URL: https://issues.apache.org/jira/browse/FLINK-34491
> Project: Flink
>  Issue Type: New Feature
>Affects Versions: 1.18.1
>Reporter: Dhruv Patel
>Priority: Major
>
> This task is to move away from experimental support for Java 17 to production 
> support so that teams running Flink in production can migrate to Java 17 
> successfully
> *Background:*
> Flink supports protobuf dataformat to exchange messages between different 
> operators and the serialization and deserialization of these protobufs 
> messages are performed by library called "Kryo". In order to move away from 
> experimental support of Java 17 released as part of Flink 1.18.1, the Kryo 
> library in Flink 1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 
> 2.24.0 does not support Java 

[jira] [Commented] (FLINK-29370) Protobuf in flink-sql-protobuf is not shaded

2024-02-29 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822311#comment-17822311
 ] 

Jeyhun Karimov commented on FLINK-29370:


Hi [~tanjialiang] you might need to consider [this 
comment|https://github.com/apache/flink/pull/14376#issuecomment-1164395312] 
before relocating protobuf

> Protobuf in flink-sql-protobuf is not shaded
> 
>
> Key: FLINK-29370
> URL: https://issues.apache.org/jira/browse/FLINK-29370
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.16.0
>Reporter: Jark Wu
>Priority: Major
>
> The protobuf classes in flink-sql-protobuf is not shaded which may lead to 
> class conflicts. Usually, sql jars should shade common used dependencies, 
> e.g. flink-sql-avro: 
> https://github.com/apache/flink/blob/master/flink-formats/flink-sql-avro/pom.xml#L88
>  
> {code}
> ➜  Downloads jar tvf flink-sql-protobuf-1.16.0.jar | grep com.google
>  0 Tue Sep 13 20:23:44 CST 2022 com/google/
>  0 Tue Sep 13 20:23:44 CST 2022 com/google/protobuf/
>568 Tue Sep 13 20:23:44 CST 2022 
> com/google/protobuf/ProtobufInternalUtils.class
>  19218 Tue Sep 13 20:23:44 CST 2022 
> com/google/protobuf/AbstractMessage$Builder.class
>259 Tue Sep 13 20:23:44 CST 2022 
> com/google/protobuf/AbstractMessage$BuilderParent.class
>  10167 Tue Sep 13 20:23:44 CST 2022 com/google/protobuf/AbstractMessage.class
>   1486 Tue Sep 13 20:23:44 CST 2022 
> com/google/protobuf/AbstractMessageLite$Builder$LimitedInputStream.class
>  12399 Tue Sep 13 20:23:44 CST 2022 
> com/google/protobuf/AbstractMessageLite$Builder.class
>279 Tue Sep 13 20:23:44 CST 2022 
> com/google/protobuf/AbstractMessageLite$InternalOneOfEnu
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34554) Using EXACTLY_ONCE with KafkaSink cause broker's OOM due to newly created transactionalId per checkpoint

2024-02-29 Thread Hilmi Al Fatih (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hilmi Al Fatih updated FLINK-34554:
---
Description: 
Flink version: 1.17.1
Kafka Broker version: 2.7.1 * 4 GB heap memory for each

Hi, We recently had an outage in our production system after we perform a Flink 
kafka-connector API upgrade. To give a brief context, our application is a 
simple kafka-to-kafka pipeline with minimal processing. We run in EXACTLY_ONCE 
mode, thus kafka transaction is involved.

Our application runs with total around 350 sink subtask. Checkpoint period was 
set to 5 seconds to avoid blocking {{read_committed}} consumers too long. We 
recently performed an upgrade with the following details:

Previous state:
 * Flink version: 1.14.4
 * Broker version: 2.7.1
 * kafka connector API: FlinkKafkaProducer

Update to:
 * Flink version: 1.17.1
 * Broker version: 2.7.1
 * kafka connector API: KafkaSink

Around 10 hours after the deployment, our kafka broker started to failing with 
OOM error. Heap dump entries are dominated by the ProducerStateEntry records.
Our investigation leads to finding the total implementation change between 
FlinkKafkaProducer and KafkaSink.
 * KafkaSink generate different transactionalId for each checkpoint,
 * FlinkKafkaProducer uses constant set of transactionalId pool.

With this behavior, KafkaSink seemed to exhaust our broker heap very fast and 
the ProducerStateEntry will only expire after 
[transactional.id.expiration.ms|http://transactional.id.expiration.ms/] , which 
by default is set to 7 days.  
([ref1|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/Log.scala#L677],
 
[ref2|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/LogManager.scala#L268],
 
[ref3|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/LogManager.scala#L1207])

For our job, it means it creates roughly:
 * 10 hour running) 350 ids * 12 times/minute * 60 min/hour * 10 hour ~ 
2,520,000
 * 7 days) ~ 42mil entries.

Attached below is the number of ProducerStateEntry entries of heap dump when it 
is OOM:
 * 505,000 (6.5%), in total it would be roughly ~ 7,000,000 entries.

There are several things that come up in our mind to mitigate the drawbacks 
such as:
 * reduce the number of subtasks, so it reduces the number of transactionalId
 * Enlarge the checkpoint period to reduce the newly generated transactionalId 
rate.
 * Shorten 
[transactional.id.expiration.ms|http://transactional.id.expiration.ms/] to 
expire the unused transactionalId soon.
 * Increase the broker heap

However, above mitigation might be too cumbersome and need careful tuning which 
harm our flexibility.In addition, due to the lack of maintaining lingering 
transaction state, TransactionAborter seems to abort old transaction naively. 
We might be accidentally (or purposefully) reuse the same transactionalIdPrefix 
and start the counter from 0. In that case, if the old transactionalId happens 
to have epoch >0, it will keep looping aborting the nonexistent transactions up 
to the latest checkpoint counter (which may be too big) and make the job stuck.

Btw, I am aware that in Flink 2.0, you guys are putting a lot of effort on 
creating better integration with Kafka transaction 
([FLIP-319|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710]).
 In FLIP-319, it mentions something about TID pooling. However, it is seem that 
there is no relevant page yet for it, so I wonder whether there are any 
concrete plan already that I can follow, or if there is something I can 
contribute to, I will be really happy to help.
 
 

  was:
Flink version: 1.17.1
Kafka Broker version: 2.7.1 * 4 GB heap memory for each

Hi, We recently had an outage in our production system after we perform a Flink 
kafka-connector API upgrade. To give a brief context, our application is a 
simple kafka-to-kafka pipeline with minimal processing. We run in EXACTLY_ONCE 
mode, thus kafka transaction is involved.

Our application runs with total around 350 sink subtask. Checkpoint period was 
set to 5 seconds to avoid blocking {{read_committed}} consumers too long. We 
recently performed an upgrade with the following details:

Previous state:
 * Flink version: 1.14.4
 * Broker version: 2.7.1
 * kafka connector API: FlinkKafkaProducer

Update to:
 * Flink version: 1.17.1
 * Broker version: 2.7.1
 * kafka connector API: KafkaSink

Around 10 hours after the deployment, our kafka broker started to failing with 
OOM error. Heap dump entries are dominated by the ProducerStateEntry records.
Our investigation leads to finding the total implementation change between 
FlinkKafkaProducer and KafkaSink. * KafkaSink generate different 
transactionalId for each checkpoint,
 * FlinkKafkaProducer uses constant set 

[jira] [Updated] (FLINK-34554) Using EXACTLY_ONCE with KafkaSink cause broker's OOM due to newly created transactionalId per checkpoint

2024-02-29 Thread Hilmi Al Fatih (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hilmi Al Fatih updated FLINK-34554:
---
Description: 
Flink version: 1.17.1
Kafka Broker version: 2.7.1 * 4 GB heap memory for each

Hi, We recently had an outage in our production system after we perform a Flink 
kafka-connector API upgrade. To give a brief context, our application is a 
simple kafka-to-kafka pipeline with minimal processing. We run in EXACTLY_ONCE 
mode, thus kafka transaction is involved.

Our application runs with total around 350 sink subtask. Checkpoint period was 
set to 5 seconds to avoid blocking {{read_committed}} consumers too long. We 
recently performed an upgrade with the following details:

Previous state:
 * Flink version: 1.14.4
 * Broker version: 2.7.1
 * kafka connector API: FlinkKafkaProducer

Update to:
 * Flink version: 1.17.1
 * Broker version: 2.7.1
 * kafka connector API: KafkaSink

Around 10 hours after the deployment, our kafka broker started to failing with 
OOM error. Heap dump entries are dominated by the ProducerStateEntry records.
Our investigation leads to finding the total implementation change between 
FlinkKafkaProducer and KafkaSink. * KafkaSink generate different 
transactionalId for each checkpoint,
 * FlinkKafkaProducer uses constant set of transactionalId pool.

With this behavior, KafkaSink seemed to exhaust our broker heap very fast and 
the ProducerStateEntry will only expire after 
[transactional.id.expiration.ms|http://transactional.id.expiration.ms/] , which 
by default is set to 7 days.  
([ref1|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/Log.scala#L677],
 
[ref2|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/LogManager.scala#L268],
 
[ref3|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/LogManager.scala#L1207])

For our job, it means it creates roughly:
 * 10 hour running) 350 ids * 12 times/minute * 60 min/hour * 10 hour ~ 
2,520,000
 * 7 days) ~ 42mil entries.

Attached below is the number of ProducerStateEntry entries of heap dump when it 
is OOM:
 * 505,000 (6.5%), in total it would be roughly ~ 7,000,000 entries.

There are several things that come up in our mind to mitigate the drawbacks 
such as:
 * reduce the number of subtasks, so it reduces the number of transactionalId
 * Enlarge the checkpoint period to reduce the newly generated transactionalId 
rate.
 * Shorten 
[transactional.id.expiration.ms|http://transactional.id.expiration.ms/] to 
expire the unused transactionalId soon.
 * Increase the broker heap

However, above mitigation might be too cumbersome and need careful tuning which 
harm our flexibility.In addition, due to the lack of maintaining lingering 
transaction state, TransactionAborter seems to abort old transaction naively. 
We might be accidentally (or purposefully) reuse the same transactionalIdPrefix 
and start the counter from 0. In that case, if the old transactionalId happens 
to have epoch >0, it will keep looping aborting the nonexistent transactions up 
to the latest checkpoint counter (which may be too big) and make the job stuck.

Btw, I am aware that in Flink 2.0, you guys are putting a lot of effort on 
creating better integration with Kafka transaction 
([FLIP-319|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710]).
 In FLIP-319, it mentions something about TID pooling. However, it is seem that 
there is no relevant page yet for it, so I wonder whether there are any 
concrete plan already that I can follow, or if there is something I can 
contribute to, I will be really happy to help.
 
 

  was:
Flink version: 1.17.1
Kafka Broker version: 2.7.1 * 4 GB heap memory for each

Hi,
We recently had an outage in our production system after we perform a Flink 
kafka-connector API upgrade. To give a brief context, our application is a 
simple kafka-to-kafka pipeline with minimal processing. We run in EXACTLY_ONCE 
mode, thus kafka transaction is involved.
Our application runs with total around 350 sink subtask. Checkpoint period was 
set to 5 seconds to avoid blocking {{read_committed}} consumers too long. We 
recently performed an upgrade with the following details: * Previous state:

 * Flink version: 1.14.4
 * Broker version: 2.7.1
 * kafka connector API: FlinkKafkaProducer

 * Update to:

 * Flink version: 1.17.1
 * Broker version: 2.7.1
 * kafka connector API: KafkaSink

Around 10 hours after the deployment, our kafka broker started to failing with 
OOM error. Heap dump entries are dominated by the ProducerStateEntry records.
Our investigation leads to finding the total implementation change between 
FlinkKafkaProducer and KafkaSink. * KafkaSink generate different 
transactionalId for each checkpoint,
 * FlinkKafkaProducer uses constant 

[jira] [Updated] (FLINK-34554) Using EXACTLY_ONCE with KafkaSink cause broker's OOM due to newly created transactionalId per checkpoint

2024-02-29 Thread Hilmi Al Fatih (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hilmi Al Fatih updated FLINK-34554:
---
Priority: Blocker  (was: Minor)

> Using EXACTLY_ONCE with KafkaSink cause broker's OOM due to newly created 
> transactionalId per checkpoint
> 
>
> Key: FLINK-34554
> URL: https://issues.apache.org/jira/browse/FLINK-34554
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kafka
>Affects Versions: 1.16.3, 1.17.2, 1.18.1
>Reporter: Hilmi Al Fatih
>Priority: Blocker
> Fix For: 1.16.3, 1.17.2, 1.18.1
>
> Attachments: image (4).png, image (5).png
>
>
> Flink version: 1.17.1
> Kafka Broker version: 2.7.1 * 4 GB heap memory for each
> Hi,
> We recently had an outage in our production system after we perform a Flink 
> kafka-connector API upgrade. To give a brief context, our application is a 
> simple kafka-to-kafka pipeline with minimal processing. We run in 
> EXACTLY_ONCE mode, thus kafka transaction is involved.
> Our application runs with total around 350 sink subtask. Checkpoint period 
> was set to 5 seconds to avoid blocking {{read_committed}} consumers too long. 
> We recently performed an upgrade with the following details: * Previous state:
>  * Flink version: 1.14.4
>  * Broker version: 2.7.1
>  * kafka connector API: FlinkKafkaProducer
>  * Update to:
>  * Flink version: 1.17.1
>  * Broker version: 2.7.1
>  * kafka connector API: KafkaSink
> Around 10 hours after the deployment, our kafka broker started to failing 
> with OOM error. Heap dump entries are dominated by the ProducerStateEntry 
> records.
> Our investigation leads to finding the total implementation change between 
> FlinkKafkaProducer and KafkaSink. * KafkaSink generate different 
> transactionalId for each checkpoint,
>  * FlinkKafkaProducer uses constant set of transactionalId pool.
> With this behavior, KafkaSink seemed to exhaust our broker heap very fast and 
> the ProducerStateEntry will only expire after 
> [transactional.id.expiration.ms|http://transactional.id.expiration.ms/] , 
> which by default is set to 7 days.  
> ([ref1|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/Log.scala#L677],
>  
> [ref2|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/LogManager.scala#L268],
>  
> [ref3|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/LogManager.scala#L1207])For
>  our job, it means it creates roughly:
> 10 hour running) 350 ids * 12 times/minute * 60 min/hour * 10 hour ~ 2,520,000
> 7 days) ~ 42mil entries.Here is attached the number of ProducerStateEntry 
> entries of heap dump when it is OOM:
> 505,000 (6.5%), in total it would be roughly ~ 7,000,000 entries.There are 
> several things that come up in our mind to mitigate the drawbacks such as: * 
> reduce the number of subtasks, so it reduces the number of transactionalId
>  * Enlarge the checkpoint period to reduce the newly generated 
> transactionalId rate.
>  * Shorten 
> [transactional.id.expiration.ms|http://transactional.id.expiration.ms/] to 
> expire the unused transactionalId soon.
>  * Increase the broker heap
> However, above mitigation might be too cumbersome and need careful tuning 
> which harm our flexibility.In addition, due to the lack of maintaining 
> lingering transaction state, TransactionAborter seems to abort old 
> transaction naively. We might be accidentally (or purposefully) reuse the 
> same transactionalIdPrefix and start the counter from 0. In that case, if the 
> old transactionalId happens to have epoch >0, it will keep looping aborting 
> the nonexistent transactions up to the latest checkpoint counter (which may 
> be too big) and make the job stuck.Btw, I am aware that in Flink 2.0, you 
> guys are putting a lot of effort on creating better integration with Kafka 
> transaction 
> ([FLIP-319|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710]).
>  In FLIP-319, it mentions something about TID pooling. However, it is seem 
> that there is no relevant page yet for it, so I wonder whether there are any 
> concrete plan already that I can follow, or if there is something I can 
> contribute to, I will be really happy to help.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34554) Using EXACTLY_ONCE with KafkaSink cause broker's OOM due to newly created transactionalId per checkpoint

2024-02-29 Thread Hilmi Al Fatih (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hilmi Al Fatih updated FLINK-34554:
---
Issue Type: Improvement  (was: New Feature)

> Using EXACTLY_ONCE with KafkaSink cause broker's OOM due to newly created 
> transactionalId per checkpoint
> 
>
> Key: FLINK-34554
> URL: https://issues.apache.org/jira/browse/FLINK-34554
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.16.3, 1.17.2, 1.18.1
>Reporter: Hilmi Al Fatih
>Priority: Blocker
> Fix For: 1.16.3, 1.17.2, 1.18.1
>
> Attachments: image (4).png, image (5).png
>
>
> Flink version: 1.17.1
> Kafka Broker version: 2.7.1 * 4 GB heap memory for each
> Hi,
> We recently had an outage in our production system after we perform a Flink 
> kafka-connector API upgrade. To give a brief context, our application is a 
> simple kafka-to-kafka pipeline with minimal processing. We run in 
> EXACTLY_ONCE mode, thus kafka transaction is involved.
> Our application runs with total around 350 sink subtask. Checkpoint period 
> was set to 5 seconds to avoid blocking {{read_committed}} consumers too long. 
> We recently performed an upgrade with the following details: * Previous state:
>  * Flink version: 1.14.4
>  * Broker version: 2.7.1
>  * kafka connector API: FlinkKafkaProducer
>  * Update to:
>  * Flink version: 1.17.1
>  * Broker version: 2.7.1
>  * kafka connector API: KafkaSink
> Around 10 hours after the deployment, our kafka broker started to failing 
> with OOM error. Heap dump entries are dominated by the ProducerStateEntry 
> records.
> Our investigation leads to finding the total implementation change between 
> FlinkKafkaProducer and KafkaSink. * KafkaSink generate different 
> transactionalId for each checkpoint,
>  * FlinkKafkaProducer uses constant set of transactionalId pool.
> With this behavior, KafkaSink seemed to exhaust our broker heap very fast and 
> the ProducerStateEntry will only expire after 
> [transactional.id.expiration.ms|http://transactional.id.expiration.ms/] , 
> which by default is set to 7 days.  
> ([ref1|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/Log.scala#L677],
>  
> [ref2|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/LogManager.scala#L268],
>  
> [ref3|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/LogManager.scala#L1207])For
>  our job, it means it creates roughly:
> 10 hour running) 350 ids * 12 times/minute * 60 min/hour * 10 hour ~ 2,520,000
> 7 days) ~ 42mil entries.Here is attached the number of ProducerStateEntry 
> entries of heap dump when it is OOM:
> 505,000 (6.5%), in total it would be roughly ~ 7,000,000 entries.There are 
> several things that come up in our mind to mitigate the drawbacks such as: * 
> reduce the number of subtasks, so it reduces the number of transactionalId
>  * Enlarge the checkpoint period to reduce the newly generated 
> transactionalId rate.
>  * Shorten 
> [transactional.id.expiration.ms|http://transactional.id.expiration.ms/] to 
> expire the unused transactionalId soon.
>  * Increase the broker heap
> However, above mitigation might be too cumbersome and need careful tuning 
> which harm our flexibility.In addition, due to the lack of maintaining 
> lingering transaction state, TransactionAborter seems to abort old 
> transaction naively. We might be accidentally (or purposefully) reuse the 
> same transactionalIdPrefix and start the counter from 0. In that case, if the 
> old transactionalId happens to have epoch >0, it will keep looping aborting 
> the nonexistent transactions up to the latest checkpoint counter (which may 
> be too big) and make the job stuck.Btw, I am aware that in Flink 2.0, you 
> guys are putting a lot of effort on creating better integration with Kafka 
> transaction 
> ([FLIP-319|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710]).
>  In FLIP-319, it mentions something about TID pooling. However, it is seem 
> that there is no relevant page yet for it, so I wonder whether there are any 
> concrete plan already that I can follow, or if there is something I can 
> contribute to, I will be really happy to help.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34554) Using EXACTLY_ONCE with KafkaSink cause broker's OOM due to newly created transactionalId per checkpoint

2024-02-29 Thread Hilmi Al Fatih (Jira)
Hilmi Al Fatih created FLINK-34554:
--

 Summary: Using EXACTLY_ONCE with KafkaSink cause broker's OOM due 
to newly created transactionalId per checkpoint
 Key: FLINK-34554
 URL: https://issues.apache.org/jira/browse/FLINK-34554
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Kafka
Affects Versions: 1.18.1, 1.17.2, 1.16.3
Reporter: Hilmi Al Fatih
 Fix For: 1.18.1, 1.17.2, 1.16.3
 Attachments: image (4).png, image (5).png

Flink version: 1.17.1
Kafka Broker version: 2.7.1 * 4 GB heap memory for each

Hi,
We recently had an outage in our production system after we perform a Flink 
kafka-connector API upgrade. To give a brief context, our application is a 
simple kafka-to-kafka pipeline with minimal processing. We run in EXACTLY_ONCE 
mode, thus kafka transaction is involved.
Our application runs with total around 350 sink subtask. Checkpoint period was 
set to 5 seconds to avoid blocking {{read_committed}} consumers too long. We 
recently performed an upgrade with the following details: * Previous state:

 * Flink version: 1.14.4
 * Broker version: 2.7.1
 * kafka connector API: FlinkKafkaProducer

 * Update to:

 * Flink version: 1.17.1
 * Broker version: 2.7.1
 * kafka connector API: KafkaSink

Around 10 hours after the deployment, our kafka broker started to failing with 
OOM error. Heap dump entries are dominated by the ProducerStateEntry records.
Our investigation leads to finding the total implementation change between 
FlinkKafkaProducer and KafkaSink. * KafkaSink generate different 
transactionalId for each checkpoint,
 * FlinkKafkaProducer uses constant set of transactionalId pool.

With this behavior, KafkaSink seemed to exhaust our broker heap very fast and 
the ProducerStateEntry will only expire after 
[transactional.id.expiration.ms|http://transactional.id.expiration.ms/] , which 
by default is set to 7 days.  
([ref1|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/Log.scala#L677],
 
[ref2|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/LogManager.scala#L268],
 
[ref3|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/LogManager.scala#L1207])For
 our job, it means it creates roughly:
10 hour running) 350 ids * 12 times/minute * 60 min/hour * 10 hour ~ 2,520,000
7 days) ~ 42mil entries.Here is attached the number of ProducerStateEntry 
entries of heap dump when it is OOM:
505,000 (6.5%), in total it would be roughly ~ 7,000,000 entries.There are 
several things that come up in our mind to mitigate the drawbacks such as: * 
reduce the number of subtasks, so it reduces the number of transactionalId
 * Enlarge the checkpoint period to reduce the newly generated transactionalId 
rate.
 * Shorten 
[transactional.id.expiration.ms|http://transactional.id.expiration.ms/] to 
expire the unused transactionalId soon.
 * Increase the broker heap

However, above mitigation might be too cumbersome and need careful tuning which 
harm our flexibility.In addition, due to the lack of maintaining lingering 
transaction state, TransactionAborter seems to abort old transaction naively. 
We might be accidentally (or purposefully) reuse the same transactionalIdPrefix 
and start the counter from 0. In that case, if the old transactionalId happens 
to have epoch >0, it will keep looping aborting the nonexistent transactions up 
to the latest checkpoint counter (which may be too big) and make the job 
stuck.Btw, I am aware that in Flink 2.0, you guys are putting a lot of effort 
on creating better integration with Kafka transaction 
([FLIP-319|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710]).
 In FLIP-319, it mentions something about TID pooling. However, it is seem that 
there is no relevant page yet for it, so I wonder whether there are any 
concrete plan already that I can follow, or if there is something I can 
contribute to, I will be really happy to help.
 
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34553) Time travel support by Flink catalogs

2024-02-29 Thread Mehmet Aktas (Jira)
Mehmet Aktas created FLINK-34553:


 Summary: Time travel support by Flink catalogs
 Key: FLINK-34553
 URL: https://issues.apache.org/jira/browse/FLINK-34553
 Project: Flink
  Issue Type: Technical Debt
Reporter: Mehmet Aktas


I am trying to add time travel support for the Flink backend in 
[Ibis|https://github.com/ibis-project/ibis].

I found that Flink requires the {{catalog}} to implement {{getTable(ObjectPath 
tablePath, long timestamp)}} for time travel support:
Attention: Currently, time travel requires the corresponding catalog that the 
table belongs to implementing the getTable(ObjectPath tablePath, long 
timestamp) method. See more details in Catalog.
[[https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/time-travel/]]


The default {{GenericInMemoryCatalog}} does not seem to implement 
{{getTable()}} . I set up a {{hive metastore}} and created a {{{}hive 
catalog{}}}, but it turns out that, hive catalog also does not implement 
{{getTable()}} --  I wish Flink docs were more detailed about these ...
py4j.protocol.Py4JJavaError: An error occurred while calling o8.sqlQuery. : 
org.apache.flink.table.api.ValidationException: SQL validation failed. 
getTable(ObjectPath, long) is not implemented for class 
org.apache.flink.table.catalog.hive.HiveCatalog.
I have two options now to continue with this: * Try another catalog, like 
Iceberg catalog, to see if that one implements {{getTable()}}
 ** {{{}{}}}I am not able to find information on whether a given catalog 
implements this function. Should I dig into their source code to figure this 
out, or is there any other way to find out?
 * Implement a custom catalog with {{{}getTable(){}}}, following the 
*very-brief-instructions* given in the [Flink 
doc|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#interface-in-catalog-for-supporting-time-travel].

Is there a doc, article, mailing list or anything else that I can use to get 
more information on
 * Which catalogs implement time travel support?
 * If there is not readily available catalog supporting time travel, then how 
can we implement a custom catalog with time travel support?

 ** The instructions given 
[here|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#interface-in-catalog-for-supporting-time-travel]
 are unfortunately not sufficient for a Java-illiterate person like myself.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34517][table]fix environment configs ignored when calling procedure operation [flink]

2024-02-29 Thread via GitHub


JustinLeesin commented on PR #24397:
URL: https://github.com/apache/flink/pull/24397#issuecomment-1971516132

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34517][table]fix environment configs ignored when calling procedure operation [flink]

2024-02-29 Thread via GitHub


JustinLeesin commented on PR #24397:
URL: https://github.com/apache/flink/pull/24397#issuecomment-1971488431

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34517][table]fix environment configs ignored when calling procedure operation [flink]

2024-02-29 Thread via GitHub


JustinLeesin commented on code in PR #24397:
URL: https://github.com/apache/flink/pull/24397#discussion_r1507837138


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java:
##
@@ -210,6 +214,31 @@ void testNamedArgumentsWithOptionalArguments() {
 ResolvedSchema.of(Column.physical("result", 
DataTypes.STRING(;
 }
 
+@Test
+void testEnvironmentConf() throws DatabaseAlreadyExistException {
+// root conf should work
+Configuration configuration = new Configuration();
+configuration.setString("key1", "value1");
+StreamExecutionEnvironment env =
+
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+TestProcedureCatalogFactory.CatalogWithBuiltInProcedure 
procedureCatalog =
+new 
TestProcedureCatalogFactory.CatalogWithBuiltInProcedure("procedure_catalog");
+procedureCatalog.createDatabase(
+"system", new CatalogDatabaseImpl(Collections.emptyMap(), 
null), true);
+tableEnv.registerCatalog("test_p", procedureCatalog);
+tableEnv.useCatalog("test_p");

Review Comment:
   OK, I have refine the test case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34552) Support message deduplication for input data sources

2024-02-29 Thread Sergey Anokhovskiy (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822189#comment-17822189
 ] 

Sergey Anokhovskiy commented on FLINK-34552:


Thank you for the comment [~martijnvisser] 
{quote}> produces messages with duplicate payload within PK

That basically conflicts with the entire premise of Flink where a primary key 
constraint is a hint for Flink to leverage for optimizations.
{quote}
I'm not sure I understood your message. Here is an example of the message 
sequence for the kafka topic with duplicates: (key1, Refresh, payload1), (key2, 
Refresh, payload2), (key1, Refresh, payload1), (key2, Refersh, payload3)

 
{quote}
But how would it then work at the source? Aren't you just moving the problem 
from somewhere else in the logic to the front of the logic, since then the 
source would have to keep this interval there? You will still encounter a large 
delay in that case.
{quote}
As mentioned in the description of the ticket there are different strategies to 
deduplicate data stream. For my use case, I'm particularly interested in the 
last one where the job keeps hashes of the last message for each PK and filter 
out messages if the payload & message_type are not changed.

At some point I agree with you that it's a question about the best place for 
this logic. I had to create a separate service, however I'd like to see it as 
parameterized feature in Table API

 
{quote}Kafka topics a and b are CDC data streams and contain DELETE and REFRESH 
messages.

Are you actually using the upsert-kafka source, and treat these input sources 
as a changelog stream?

 
{quote}
The main point was that ROW_NUMBER approach doesn't work for CDC streams due to 
inability to take into account message_type

 
{quote}I would like to understand as well how you would propose to have this 
logic expressed? Is it still SQL? Is it connector parameters?
{quote}
More as a connector parameter

 

> Support message deduplication for input data sources
> 
>
> Key: FLINK-34552
> URL: https://issues.apache.org/jira/browse/FLINK-34552
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API, Table SQL / Runtime
>Reporter: Sergey Anokhovskiy
>Priority: Major
>
> My main proposal is to have duplicate message suppression logic as a part of 
> Flink Table API to be able to suppress duplicates from the input sources. It 
> might be a parameter provided by the user if they want to suppress duplicates 
> from the input source or not. Below I provided more details about my use case 
> and available approaches.
>  
> I have a Flink job which reads from two keyed kafka topics and emits messages 
> to the keyed kafka topic. The Flink job executes the join query:
> SELECT a.id, adata, bdata
> FROM a
> JOIN b
> ON a.id = b.id
>  
> One of the input kafka topics produces messages with duplicate payload within 
> PK in addition to meaningful data. That causes duplicates in the output topic 
> and creates extra load to the downstream services.
>  
> I was looking for a way to suppress duplicates and I found two strategies 
> which doesn't seem to work for my use case:
>  #  Based on the deduplication window as a kafka[ sink 
> buffer|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L46]
>  for example. The Deduplication window doesn't work well for my case because 
> the interval between duplicates is one day and I don't want my data to be 
> delayed if I use such a big window.
>  
>  #  Using 
> [ROW_NUMBER|https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/deduplication/]
>  . Unfortunately, this approach doesn't suit my use case either. Kafka topics 
> a and b are CDC data streams and contain DELETE and REFRESH messages. If 
> DELETE and REFRESH messages are coming with the same payload the job will 
> suppress the last message which will lead to the incorrect output result. If 
> I add message_type to the PARTITION key then the job will not be able to 
> process messages sequences like this: DELETE->REFRESH->DELETE (with the same 
> payload and PK), because the last message will be suppressed which will lead 
> to the incorrect output result.
>  
> Finally, I had to create a separate custom Flink service which reads the 
> output topic of the initial job and suppresses duplicates keeping hashes of 
> the last processed message for each PK in the Flink state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34522][core] Changing the Time to Duration for StateTtlConfig.Builder.cleanupInRocksdbCompactFilter [flink]

2024-02-29 Thread via GitHub


HuangXingBo commented on PR #24387:
URL: https://github.com/apache/flink/pull/24387#issuecomment-1971427877

   @1996fanrui The test failed is actually related to the implementation of 
`Duration`. Now, `Duration` is a wrapper for the java class in the JVM created 
by `gateway`. If `Duration` is used as a default parameter, it will advance the 
time of the JVM created by `gateway` used in `Duration`. The crux of the 
problem is that some test environment variables set in `TestBase` and the 
effect of jar packages added by dependent test environment variables would not 
be effective anymore. This results in the launched gateway not loading many 
flink test jars, so we will find that the test class is not found (there won't 
be a problem in the production code, it only affects the test because we add 
some test jars to the classpath in testing). The perfect solution is to change 
the Duration implementation, making Duration usable as a default parameter 
without affecting test passing. But this change would be significant, every 
part that uses `Duration` needs considering. In the short term, I believe w
 e just need to remove Duration as the default parameter, represent the default 
parameter with None, then check in the method if the parameter is `None` or 
not, similar to the implementation below.
   ```
   def cleanup_in_rocksdb_compact_filter(
   self,
   query_time_after_num_entries,
   periodic_compaction_time=Duration.of_days(30)) -> \
   'StateTtlConfig.Builder':
   self._strategies[
StateTtlConfig.CleanupStrategies.Strategies.ROCKSDB_COMPACTION_FILTER] 
= \
StateTtlConfig.CleanupStrategies.RocksdbCompactFilterCleanupStrategy(
query_time_after_num_entries,
periodic_compaction_time if periodic_compaction_time else 
Duration.of_days(30))
   return self
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-34552) Support message deduplication for input data sources

2024-02-29 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822172#comment-17822172
 ] 

Martijn Visser edited comment on FLINK-34552 at 2/29/24 3:13 PM:
-

A couple of points from my point of view:

{quote}produces messages with duplicate payload within PK{quote}

That basically conflicts with the entire premise of Flink where a primary key 
constraint is a hint for Flink to leverage for optimizations.

{quote}The Deduplication window doesn't work well for my case because the 
interval between duplicates is one day and I don't want my data to be delayed 
if I use such a big window.{quote}

But how would it then work at the source? Aren't you just moving the problem 
from somewhere else in the logic to the front of the logic, since then the 
source would have to keep this interval there? You will still encounter a large 
delay in that case. 

{quote}Kafka topics a and b are CDC data streams and contain DELETE and REFRESH 
messages. {quote}

Are you actually using the upsert-kafka source, and treat these input sources 
as a changelog stream? 

I would like to understand as well how you would propose to have this logic 
expressed? Is it still SQL? Is it connector parameters?


was (Author: martijnvisser):
A couple of points from my point of view:

{quote}produces messages with duplicate payload within PK{quote}

That basically conflicts with the entire premise of Flink where a primary key 
constraint is a hint for Flink to leverage for optimizations.

{quote}The Deduplication window doesn't work well for my case because the 
interval between duplicates is one day and I don't want my data to be delayed 
if I use such a big window.{quote}

But how would it then work at the source? Aren't you just moving the problem 
from somewhere else in the logic to the front of the logic, since then the 
source would have to keep this interval there? You will still encounter a large 
delay in that case. 

{quote}Kafka topics a and b are CDC data streams and contain DELETE and REFRESH 
messages. {quote}

Are you actually using the upsert-kafka source, and treat these input sources 
as a changelog stream? 

> Support message deduplication for input data sources
> 
>
> Key: FLINK-34552
> URL: https://issues.apache.org/jira/browse/FLINK-34552
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API, Table SQL / Runtime
>Reporter: Sergey Anokhovskiy
>Priority: Major
>
> My main proposal is to have duplicate message suppression logic as a part of 
> Flink Table API to be able to suppress duplicates from the input sources. It 
> might be a parameter provided by the user if they want to suppress duplicates 
> from the input source or not. Below I provided more details about my use case 
> and available approaches.
>  
> I have a Flink job which reads from two keyed kafka topics and emits messages 
> to the keyed kafka topic. The Flink job executes the join query:
> SELECT a.id, adata, bdata
> FROM a
> JOIN b
> ON a.id = b.id
>  
> One of the input kafka topics produces messages with duplicate payload within 
> PK in addition to meaningful data. That causes duplicates in the output topic 
> and creates extra load to the downstream services.
>  
> I was looking for a way to suppress duplicates and I found two strategies 
> which doesn't seem to work for my use case:
>  #  Based on the deduplication window as a kafka[ sink 
> buffer|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L46]
>  for example. The Deduplication window doesn't work well for my case because 
> the interval between duplicates is one day and I don't want my data to be 
> delayed if I use such a big window.
>  
>  #  Using 
> [ROW_NUMBER|https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/deduplication/]
>  . Unfortunately, this approach doesn't suit my use case either. Kafka topics 
> a and b are CDC data streams and contain DELETE and REFRESH messages. If 
> DELETE and REFRESH messages are coming with the same payload the job will 
> suppress the last message which will lead to the incorrect output result. If 
> I add message_type to the PARTITION key then the job will not be able to 
> process messages sequences like this: DELETE->REFRESH->DELETE (with the same 
> payload and PK), because the last message will be suppressed which will lead 
> to the incorrect output result.
>  
> Finally, I had to create a separate custom Flink service which reads the 
> output topic of the initial job and suppresses duplicates keeping hashes of 
> the last processed message for each PK in the Flink state.



--
This message was sent by 

[jira] [Commented] (FLINK-34552) Support message deduplication for input data sources

2024-02-29 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822172#comment-17822172
 ] 

Martijn Visser commented on FLINK-34552:


A couple of points from my point of view:

{quote}produces messages with duplicate payload within PK{quote}

That basically conflicts with the entire premise of Flink where a primary key 
constraint is a hint for Flink to leverage for optimizations.

{quote}The Deduplication window doesn't work well for my case because the 
interval between duplicates is one day and I don't want my data to be delayed 
if I use such a big window.{quote}

But how would it then work at the source? Aren't you just moving the problem 
from somewhere else in the logic to the front of the logic, since then the 
source would have to keep this interval there? You will still encounter a large 
delay in that case. 

{quote}Kafka topics a and b are CDC data streams and contain DELETE and REFRESH 
messages. {quote}

Are you actually using the upsert-kafka source, and treat these input sources 
as a changelog stream? 

> Support message deduplication for input data sources
> 
>
> Key: FLINK-34552
> URL: https://issues.apache.org/jira/browse/FLINK-34552
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API, Table SQL / Runtime
>Reporter: Sergey Anokhovskiy
>Priority: Major
>
> My main proposal is to have duplicate message suppression logic as a part of 
> Flink Table API to be able to suppress duplicates from the input sources. It 
> might be a parameter provided by the user if they want to suppress duplicates 
> from the input source or not. Below I provided more details about my use case 
> and available approaches.
>  
> I have a Flink job which reads from two keyed kafka topics and emits messages 
> to the keyed kafka topic. The Flink job executes the join query:
> SELECT a.id, adata, bdata
> FROM a
> JOIN b
> ON a.id = b.id
>  
> One of the input kafka topics produces messages with duplicate payload within 
> PK in addition to meaningful data. That causes duplicates in the output topic 
> and creates extra load to the downstream services.
>  
> I was looking for a way to suppress duplicates and I found two strategies 
> which doesn't seem to work for my use case:
>  #  Based on the deduplication window as a kafka[ sink 
> buffer|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L46]
>  for example. The Deduplication window doesn't work well for my case because 
> the interval between duplicates is one day and I don't want my data to be 
> delayed if I use such a big window.
>  
>  #  Using 
> [ROW_NUMBER|https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/deduplication/]
>  . Unfortunately, this approach doesn't suit my use case either. Kafka topics 
> a and b are CDC data streams and contain DELETE and REFRESH messages. If 
> DELETE and REFRESH messages are coming with the same payload the job will 
> suppress the last message which will lead to the incorrect output result. If 
> I add message_type to the PARTITION key then the job will not be able to 
> process messages sequences like this: DELETE->REFRESH->DELETE (with the same 
> payload and PK), because the last message will be suppressed which will lead 
> to the incorrect output result.
>  
> Finally, I had to create a separate custom Flink service which reads the 
> output topic of the initial job and suppresses duplicates keeping hashes of 
> the last processed message for each PK in the Flink state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34419][docker] Add tests for JDK 17 & 21 [flink-docker]

2024-02-29 Thread via GitHub


morazow commented on code in PR #182:
URL: https://github.com/apache/flink-docker/pull/182#discussion_r1507732247


##
.github/workflows/ci.yml:
##
@@ -17,14 +17,22 @@ name: "CI"
 
 on: [push, pull_request]
 
+env:
+  TAR_URL: 
"https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz;
+
 jobs:
   ci:
+name: CI using JDK ${{ matrix.java_version }}
 runs-on: ubuntu-latest
+strategy:
+  fail-fast: false
+  max-parallel: 1

Review Comment:
   > > #11 8.399 + /bin/bash /opt/flink/bin/config-parser-utils.sh 
/opt/flink/conf /opt/flink/bin /opt/flink/lib -repKV 
rest.address,localhost,0.0.0.0 -repKV rest.bind-address,localhost,0.0.0.0 
-repKV jobmanager.bind-host,localhost,0.0.0.0 -repKV 
taskmanager.bind-host,localhost,0.0.0.0 -rmKV taskmanager.host=localhost
   > > #11 8.625 sed: can't read /config.yaml: No such file or directory
   > 
   > I forgot that we're also doing testing. Maybe, there's something odd with 
the config modification logic (see [workflow 
log](https://github.com/morazow/flink-docker/actions/runs/8081181748/job/22079149028#step:4:8838))?
   
   Yes, tests have these kind of issues. Another ones:
   
   ```
   WARNING: attempted to load jemalloc from 
/usr/lib/x86_64-linux-gnu/libjemalloc.so but the library couldn't be found. 
glibc will be used instead.
   ./docker-entrypoint.sh: line 80: 
/home/runner/work/flink-docker/flink-docker/testing/bin/config-parser-utils.sh: 
No such file or directory
   jemalloc was enabled but it was not found in the system. LD_PRELOAD is 
unchanged and glibc will be used instead.
   ./docker-entrypoint.sh: line 
[8](https://github.com/morazow/flink-docker/actions/runs/8096433961/job/22125972877#step:4:9)0:
 
/home/runner/work/flink-docker/flink-docker/testing/bin/config-parser-utils.sh: 
No such file or directory
   ```
   
   But that line still happens when it is green also. Here the success run: 
https://github.com/morazow/flink-docker/actions/runs/8096433961/job/22125972520
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29370) Protobuf in flink-sql-protobuf is not shaded

2024-02-29 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822169#comment-17822169
 ] 

tanjialiang commented on FLINK-29370:
-

[~jark] [~libenchao] [~maosuhan] I found that flink-sql-orc has the protobuf 
dependency without shading, and it conflicts with the flink-sql-protobuf, my 
flink version is 1.16.1. For now, the temporary solution is to shade the 
protobuf dependency in both the flink-sql-protobuf and user-proto classes by 
myself.

> Protobuf in flink-sql-protobuf is not shaded
> 
>
> Key: FLINK-29370
> URL: https://issues.apache.org/jira/browse/FLINK-29370
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.16.0
>Reporter: Jark Wu
>Priority: Major
>
> The protobuf classes in flink-sql-protobuf is not shaded which may lead to 
> class conflicts. Usually, sql jars should shade common used dependencies, 
> e.g. flink-sql-avro: 
> https://github.com/apache/flink/blob/master/flink-formats/flink-sql-avro/pom.xml#L88
>  
> {code}
> ➜  Downloads jar tvf flink-sql-protobuf-1.16.0.jar | grep com.google
>  0 Tue Sep 13 20:23:44 CST 2022 com/google/
>  0 Tue Sep 13 20:23:44 CST 2022 com/google/protobuf/
>568 Tue Sep 13 20:23:44 CST 2022 
> com/google/protobuf/ProtobufInternalUtils.class
>  19218 Tue Sep 13 20:23:44 CST 2022 
> com/google/protobuf/AbstractMessage$Builder.class
>259 Tue Sep 13 20:23:44 CST 2022 
> com/google/protobuf/AbstractMessage$BuilderParent.class
>  10167 Tue Sep 13 20:23:44 CST 2022 com/google/protobuf/AbstractMessage.class
>   1486 Tue Sep 13 20:23:44 CST 2022 
> com/google/protobuf/AbstractMessageLite$Builder$LimitedInputStream.class
>  12399 Tue Sep 13 20:23:44 CST 2022 
> com/google/protobuf/AbstractMessageLite$Builder.class
>279 Tue Sep 13 20:23:44 CST 2022 
> com/google/protobuf/AbstractMessageLite$InternalOneOfEnu
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34552) Support message deduplication for input data sources

2024-02-29 Thread Ashish Khatkar (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822161#comment-17822161
 ] 

Ashish Khatkar edited comment on FLINK-34552 at 2/29/24 2:50 PM:
-

Adding an example for this

consider two tables
Table A
Fields :

{A, B, C, D}

Table B
Fields :

{A, E, F, G}

Query : Select A, B, F from table A join table b on field A

consider a case where the join will contain 1B records for \{A, B, C, D, E, F, 
G} but number of unique records for fields we are interested in are 1M \{A, B, 
F}.

Now any change that happens in fields C, D, E, G is going to produce records 
-U\{A, B, F} +U\{A, B, F} (as the join will emit changelog stream) but 
effectively the records which we are interested in hasn’t changed.


was (Author: akhatkar):
Adding an example for this

consider two tables
Table A
Fields :

{A, B, C, D}

Table B
Fields :

{A, E, F, G}

Query : Select A, B, F from table A join table b on field A

consider a case where the join will contain 1B records for \{A, B, C, D, E, F, 
G} but number of unique records for fields we are interested in are 1M \{A, B, 
F}.

Now any change that happens in fields C, D, E, G is going to produce records 
-U\{A, B, F} +U\{A, B, F} (as the join will emit changelog stream) but 
effectively the records hasn’t changed.

> Support message deduplication for input data sources
> 
>
> Key: FLINK-34552
> URL: https://issues.apache.org/jira/browse/FLINK-34552
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API, Table SQL / Runtime
>Reporter: Sergey Anokhovskiy
>Priority: Major
>
> My main proposal is to have duplicate message suppression logic as a part of 
> Flink Table API to be able to suppress duplicates from the input sources. It 
> might be a parameter provided by the user if they want to suppress duplicates 
> from the input source or not. Below I provided more details about my use case 
> and available approaches.
>  
> I have a Flink job which reads from two keyed kafka topics and emits messages 
> to the keyed kafka topic. The Flink job executes the join query:
> SELECT a.id, adata, bdata
> FROM a
> JOIN b
> ON a.id = b.id
>  
> One of the input kafka topics produces messages with duplicate payload within 
> PK in addition to meaningful data. That causes duplicates in the output topic 
> and creates extra load to the downstream services.
>  
> I was looking for a way to suppress duplicates and I found two strategies 
> which doesn't seem to work for my use case:
>  #  Based on the deduplication window as a kafka[ sink 
> buffer|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L46]
>  for example. The Deduplication window doesn't work well for my case because 
> the interval between duplicates is one day and I don't want my data to be 
> delayed if I use such a big window.
>  
>  #  Using 
> [ROW_NUMBER|https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/deduplication/]
>  . Unfortunately, this approach doesn't suit my use case either. Kafka topics 
> a and b are CDC data streams and contain DELETE and REFRESH messages. If 
> DELETE and REFRESH messages are coming with the same payload the job will 
> suppress the last message which will lead to the incorrect output result. If 
> I add message_type to the PARTITION key then the job will not be able to 
> process messages sequences like this: DELETE->REFRESH->DELETE (with the same 
> payload and PK), because the last message will be suppressed which will lead 
> to the incorrect output result.
>  
> Finally, I had to create a separate custom Flink service which reads the 
> output topic of the initial job and suppresses duplicates keeping hashes of 
> the last processed message for each PK in the Flink state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]

2024-02-29 Thread via GitHub


WencongLiu commented on code in PR #24398:
URL: https://github.com/apache/flink/pull/24398#discussion_r1507354167


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/AbstractSortPartitionOperator.java:
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.AlgorithmOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.operators.sort.ExternalSorter;
+import org.apache.flink.runtime.operators.sort.PushSorter;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OperatorAttributes;
+import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+/**
+ * The {@link AbstractSortPartitionOperator} is the base class of sort 
partition operator, which
+ * provides shared construction methods and utility functions.
+ *
+ * @param  The type of input record.
+ * @param  The type used to sort the records, which may be 
different from the INPUT_TYPE.
+ * For example, if the input record is sorted according to the selected 
key by {@link
+ * KeySelector}, the selected key should also be written to {@link 
ExternalSorter} with the
+ * input record to avid repeated key selections. In this case, the type 
used to sort the records
+ * will be a tuple containing both the selected key and record.
+ */
+@Internal
+@SuppressWarnings("unchecked")
+public abstract class AbstractSortPartitionOperator
+extends AbstractStreamOperator
+implements OneInputStreamOperator, 
BoundedOneInput {
+
+/** The default manage memory weight of sort partition operator. */
+public static final int DEFAULT_MANAGE_MEMORY_WEIGHT = 128;

Review Comment:
   I keeped it same with the default managed memory weight value of batch keyed 
sorter. The developers could use 
`getTransformation().declareManagedMemoryUseCaseAtOperatorScope()` to set 
memory usage by themselves. Adding a separate configuration item for a 
datastream API would be a little complicated. WDYT?



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/AbstractSortPartitionOperator.java:
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 

[jira] [Comment Edited] (FLINK-34552) Support message deduplication for input data sources

2024-02-29 Thread Ashish Khatkar (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822161#comment-17822161
 ] 

Ashish Khatkar edited comment on FLINK-34552 at 2/29/24 2:49 PM:
-

Adding an example for this

consider two tables
Table A
Fields :

{A, B, C, D}

Table B
Fields :

{A, E, F, G}

Query : Select A, B, F from table A join table b on field A

consider a case where the join will contain 1B records for \{A, B, C, D, E, F, 
G} but number of unique records for fields we are interested in are 1M \{A, B, 
F}.

Now any change that happens in fields C, D, E, G is going to produce records 
-U\{A, B, F} +U\{A, B, F} (as the join will emit changelog stream) but 
effectively the records hasn’t changed.


was (Author: akhatkar):
Adding an example for this

consider two tables
Table A
Fields : {A, B, C, D}
Table B
Fields : {A, E, F, G}

Query : Select A, B, F from table A join table b on field A

consider a case where the join will contain 1B records for {A, B, C, D, E, F, 
G} but number of unique records for fields we are interested in are 1M {A, B, 
F}.

now any change that happens in fields C, D, E, G is going to produce records 
-U{A, B, F} +U{A, B, F} (as the join will emit changelog stream) but 
effectively the records hasn’t changed.

> Support message deduplication for input data sources
> 
>
> Key: FLINK-34552
> URL: https://issues.apache.org/jira/browse/FLINK-34552
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API, Table SQL / Runtime
>Reporter: Sergey Anokhovskiy
>Priority: Major
>
> My main proposal is to have duplicate message suppression logic as a part of 
> Flink Table API to be able to suppress duplicates from the input sources. It 
> might be a parameter provided by the user if they want to suppress duplicates 
> from the input source or not. Below I provided more details about my use case 
> and available approaches.
>  
> I have a Flink job which reads from two keyed kafka topics and emits messages 
> to the keyed kafka topic. The Flink job executes the join query:
> SELECT a.id, adata, bdata
> FROM a
> JOIN b
> ON a.id = b.id
>  
> One of the input kafka topics produces messages with duplicate payload within 
> PK in addition to meaningful data. That causes duplicates in the output topic 
> and creates extra load to the downstream services.
>  
> I was looking for a way to suppress duplicates and I found two strategies 
> which doesn't seem to work for my use case:
>  #  Based on the deduplication window as a kafka[ sink 
> buffer|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L46]
>  for example. The Deduplication window doesn't work well for my case because 
> the interval between duplicates is one day and I don't want my data to be 
> delayed if I use such a big window.
>  
>  #  Using 
> [ROW_NUMBER|https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/deduplication/]
>  . Unfortunately, this approach doesn't suit my use case either. Kafka topics 
> a and b are CDC data streams and contain DELETE and REFRESH messages. If 
> DELETE and REFRESH messages are coming with the same payload the job will 
> suppress the last message which will lead to the incorrect output result. If 
> I add message_type to the PARTITION key then the job will not be able to 
> process messages sequences like this: DELETE->REFRESH->DELETE (with the same 
> payload and PK), because the last message will be suppressed which will lead 
> to the incorrect output result.
>  
> Finally, I had to create a separate custom Flink service which reads the 
> output topic of the initial job and suppresses duplicates keeping hashes of 
> the last processed message for each PK in the Flink state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]

2024-02-29 Thread via GitHub


WencongLiu commented on code in PR #24398:
URL: https://github.com/apache/flink/pull/24398#discussion_r1507354390


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java:
##
@@ -1071,6 +1077,110 @@ protected SingleOutputStreamOperator 
aggregate(AggregationFunction aggrega
 return reduce(aggregate).name("Keyed Aggregation");
 }
 
+@Override
+public PartitionWindowedStream fullWindowPartition() {
+throw new UnsupportedOperationException(
+"KeyedStream doesn't support processing non-keyed 
partitions.");

Review Comment:
   The note has been fixed.



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/AbstractSortPartitionOperator.java:
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.AlgorithmOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.operators.sort.ExternalSorter;
+import org.apache.flink.runtime.operators.sort.PushSorter;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OperatorAttributes;
+import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+/**
+ * The {@link AbstractSortPartitionOperator} is the base class of sort 
partition operator, which
+ * provides shared construction methods and utility functions.
+ *
+ * @param  The type of input record.
+ * @param  The type used to sort the records, which may be 
different from the INPUT_TYPE.
+ * For example, if the input record is sorted according to the selected 
key by {@link
+ * KeySelector}, the selected key should also be written to {@link 
ExternalSorter} with the
+ * input record to avid repeated key selections. In this case, the type 
used to sort the records
+ * will be a tuple containing both the selected key and record.
+ */
+@Internal
+@SuppressWarnings("unchecked")
+public abstract class AbstractSortPartitionOperator

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34552) Support message deduplication for input data sources

2024-02-29 Thread Ashish Khatkar (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822161#comment-17822161
 ] 

Ashish Khatkar commented on FLINK-34552:


Adding an example for this

consider two tables
Table A
Fields : {A, B, C, D}
Table B
Fields : {A, E, F, G}

Query : Select A, B, F from table A join table b on field A

consider a case where the join will contain 1B records for {A, B, C, D, E, F, 
G} but number of unique records for fields we are interested in are 1M {A, B, 
F}.

now any change that happens in fields C, D, E, G is going to produce records 
-U{A, B, F} +U{A, B, F} (as the join will emit changelog stream) but 
effectively the records hasn’t changed.

> Support message deduplication for input data sources
> 
>
> Key: FLINK-34552
> URL: https://issues.apache.org/jira/browse/FLINK-34552
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API, Table SQL / Runtime
>Reporter: Sergey Anokhovskiy
>Priority: Major
>
> My main proposal is to have duplicate message suppression logic as a part of 
> Flink Table API to be able to suppress duplicates from the input sources. It 
> might be a parameter provided by the user if they want to suppress duplicates 
> from the input source or not. Below I provided more details about my use case 
> and available approaches.
>  
> I have a Flink job which reads from two keyed kafka topics and emits messages 
> to the keyed kafka topic. The Flink job executes the join query:
> SELECT a.id, adata, bdata
> FROM a
> JOIN b
> ON a.id = b.id
>  
> One of the input kafka topics produces messages with duplicate payload within 
> PK in addition to meaningful data. That causes duplicates in the output topic 
> and creates extra load to the downstream services.
>  
> I was looking for a way to suppress duplicates and I found two strategies 
> which doesn't seem to work for my use case:
>  #  Based on the deduplication window as a kafka[ sink 
> buffer|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L46]
>  for example. The Deduplication window doesn't work well for my case because 
> the interval between duplicates is one day and I don't want my data to be 
> delayed if I use such a big window.
>  
>  #  Using 
> [ROW_NUMBER|https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/deduplication/]
>  . Unfortunately, this approach doesn't suit my use case either. Kafka topics 
> a and b are CDC data streams and contain DELETE and REFRESH messages. If 
> DELETE and REFRESH messages are coming with the same payload the job will 
> suppress the last message which will lead to the incorrect output result. If 
> I add message_type to the PARTITION key then the job will not be able to 
> process messages sequences like this: DELETE->REFRESH->DELETE (with the same 
> payload and PK), because the last message will be suppressed which will lead 
> to the incorrect output result.
>  
> Finally, I had to create a separate custom Flink service which reads the 
> output topic of the initial job and suppresses duplicates keeping hashes of 
> the last processed message for each PK in the Flink state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34552) Support message deduplication for input data sources

2024-02-29 Thread Sergey Anokhovskiy (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Anokhovskiy updated FLINK-34552:
---
Description: 
My main proposal is to have duplicate message suppression logic as a part of 
Flink Table API to be able to suppress duplicates from the input sources. It 
might be a parameter provided by the user if they want to suppress duplicates 
from the input source or not. Below I provided more details about my use case 
and available approaches.

 

I have a Flink job which reads from two keyed kafka topics and emits messages 
to the keyed kafka topic. The Flink job executes the join query:

SELECT a.id, adata, bdata

FROM a

JOIN b

ON a.id = b.id

 

One of the input kafka topics produces messages with duplicate payload within 
PK in addition to meaningful data. That causes duplicates in the output topic 
and creates extra load to the downstream services.

 

I was looking for a way to suppress duplicates and I found two strategies which 
doesn't seem to work for my use case:
 #  Based on the deduplication window as a kafka[ sink 
buffer|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L46]
 for example. The Deduplication window doesn't work well for my case because 
the interval between duplicates is one day and I don't want my data to be 
delayed if I use such a big window.

 
 #  Using 
[ROW_NUMBER|https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/deduplication/]
 . Unfortunately, this approach doesn't suit my use case either. Kafka topics a 
and b are CDC data streams and contain DELETE and REFRESH messages. If DELETE 
and REFRESH messages are coming with the same payload the job will suppress the 
last message which will lead to the incorrect output result. If I add 
message_type to the PARTITION key then the job will not be able to process 
messages sequences like this: DELETE->REFRESH->DELETE (with the same payload 
and PK), because the last message will be suppressed which will lead to the 
incorrect output result.

 

Finally, I had to create a separate custom Flink service which reads the output 
topic of the initial job and suppresses duplicates keeping hashes of the last 
processed message for each PK in the Flink state.

  was:
My main proposal is: To have duplicate message suppression logic as a part of 
flink table api to be able to suppress duplicates from the input sources. It 
might be a parameter provided by user if they want to suppress duplicates from 
the input source or not. Below I provided more details about my use case and 
available approaches.

I have a flink job which reads from two keyed kafka topics and emits messages 
to the keyed kafka topic. The flink job executes the join query:
SELECT a.id, adata, bdata
FROM a
JOIN b
ON a.id = b.id

One of the input kafka topics produces messages with duplicate payload within 
PK in additional to meaningful data. That causes duplicates in the output topic 
and creates extra load to the downstream services.

I was looking for a way to suppress duplicates and I found two strategies which 
doesn't seem to work for my use case:
1. Based on deduplication window as kafka sink buffer for example 
https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L46
Deduplication window doesn't work well for my case because the interval between 
duplicates is one day and I don't want my data to be delayed if use such a big 
window.

2. Using ROW_NUMBER 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/deduplication/
 . Unfortunately, this approach doesn't suit my use case too. Kafka topics a 
and b are CDC data streams and contain DELETE and REFRESH messages. If DELETE 
and REFRESH messages are comming with the same payload the job will suppress 
the last message which will lead to the incorrect output result. If I add 
message_type to the PARTITION key then the job will not be able to process 
messages sequences like this: DELETE->REFRESH->DELETE (with the same payload 
and PK), because the last message will be suppressed which will lead to the 
incorrect output result.

Finally, I had to create a separate custom flink service which reads the output 
topic of the initial job and suppresses duplicates keeping message hashes in 
the state. The initial join job, described above, still has to process 
duplicates. Would it better to be able to suppress duplicates from the input 
sources?

 


> Support message deduplication for input data sources
> 
>
> Key: FLINK-34552
> URL: https://issues.apache.org/jira/browse/FLINK-34552
> Project: Flink
> 

[jira] [Created] (FLINK-34551) Align retry mechanisms of FutureUtils

2024-02-29 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-34551:
-

 Summary: Align retry mechanisms of FutureUtils
 Key: FLINK-34551
 URL: https://issues.apache.org/jira/browse/FLINK-34551
 Project: Flink
  Issue Type: Technical Debt
  Components: API / Core
Affects Versions: 1.20.0
Reporter: Matthias Pohl


The retry mechanisms of FutureUtils include quite a bit of redundant code which 
makes it hard to understand and to extend. The logic should be aligned properly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34552) Support message deduplication for input data sources

2024-02-29 Thread Sergey Anokhovskiy (Jira)
Sergey Anokhovskiy created FLINK-34552:
--

 Summary: Support message deduplication for input data sources
 Key: FLINK-34552
 URL: https://issues.apache.org/jira/browse/FLINK-34552
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API, Table SQL / Runtime
Reporter: Sergey Anokhovskiy


My main proposal is: To have duplicate message suppression logic as a part of 
flink table api to be able to suppress duplicates from the input sources. It 
might be a parameter provided by user if they want to suppress duplicates from 
the input source or not. Below I provided more details about my use case and 
available approaches.

I have a flink job which reads from two keyed kafka topics and emits messages 
to the keyed kafka topic. The flink job executes the join query:
SELECT a.id, adata, bdata
FROM a
JOIN b
ON a.id = b.id

One of the input kafka topics produces messages with duplicate payload within 
PK in additional to meaningful data. That causes duplicates in the output topic 
and creates extra load to the downstream services.

I was looking for a way to suppress duplicates and I found two strategies which 
doesn't seem to work for my use case:
1. Based on deduplication window as kafka sink buffer for example 
https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L46
Deduplication window doesn't work well for my case because the interval between 
duplicates is one day and I don't want my data to be delayed if use such a big 
window.

2. Using ROW_NUMBER 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/deduplication/
 . Unfortunately, this approach doesn't suit my use case too. Kafka topics a 
and b are CDC data streams and contain DELETE and REFRESH messages. If DELETE 
and REFRESH messages are comming with the same payload the job will suppress 
the last message which will lead to the incorrect output result. If I add 
message_type to the PARTITION key then the job will not be able to process 
messages sequences like this: DELETE->REFRESH->DELETE (with the same payload 
and PK), because the last message will be suppressed which will lead to the 
incorrect output result.

Finally, I had to create a separate custom flink service which reads the output 
topic of the initial job and suppresses duplicates keeping message hashes in 
the state. The initial join job, described above, still has to process 
duplicates. Would it better to be able to suppress duplicates from the input 
sources?

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34419][docker] Add tests for JDK 17 & 21 [flink-docker]

2024-02-29 Thread via GitHub


morazow commented on code in PR #182:
URL: https://github.com/apache/flink-docker/pull/182#discussion_r1507649411


##
.github/workflows/ci.yml:
##
@@ -17,14 +17,22 @@ name: "CI"
 
 on: [push, pull_request]
 
+env:
+  TAR_URL: 
"https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz;
+
 jobs:
   ci:
+name: CI using JDK ${{ matrix.java_version }}
 runs-on: ubuntu-latest
+strategy:
+  fail-fast: false
+  max-parallel: 1

Review Comment:
   Read about the GHA concurrency: 
https://docs.github.com/en/actions/using-jobs/using-concurrency#using-concurrency-in-different-scenarios
   
   My assumption is that I pushed another commit while another workflow was 
running and maybe they clashed on container name. But I tried to trigger 
several workflow runs, still could not reproduce the issue above.
   
   I would suggest to add concurrency definition to our matrix builds:
   
   ```
   concurrency:
 group: ${{ github.workflow }}-${{ github.ref }}-java-${{ 
matrix.java_version }}
 cancel-in-progress: true
   ```
   
   This way if commits pushed one after the other only the last commit will run 
the workflow.
   
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34419][docker] Add tests for JDK 17 & 21 [flink-docker]

2024-02-29 Thread via GitHub


morazow commented on code in PR #182:
URL: https://github.com/apache/flink-docker/pull/182#discussion_r1507649411


##
.github/workflows/ci.yml:
##
@@ -17,14 +17,22 @@ name: "CI"
 
 on: [push, pull_request]
 
+env:
+  TAR_URL: 
"https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz;
+
 jobs:
   ci:
+name: CI using JDK ${{ matrix.java_version }}
 runs-on: ubuntu-latest
+strategy:
+  fail-fast: false
+  max-parallel: 1

Review Comment:
   Read about the GHA concurrency: 
https://docs.github.com/en/actions/using-jobs/using-concurrency#using-concurrency-in-different-scenarios
   
   My assumption is that I pushed another commit while another workflow was 
running and maybe they clashed on container name.
   
   I would suggest to add concurrency definition to our matrix builds:
   
   ```
   concurrency:
 group: ${{ github.workflow }}-${{ github.ref }}-java-${{ 
matrix.java_version }}
 cancel-in-progress: true
   ```
   
   This way if commits pushed one after the other only the last commit will run 
the workflow.
   
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34419][docker] Add tests for JDK 17 & 21 [flink-docker]

2024-02-29 Thread via GitHub


morazow commented on code in PR #182:
URL: https://github.com/apache/flink-docker/pull/182#discussion_r1507649411


##
.github/workflows/ci.yml:
##
@@ -17,14 +17,22 @@ name: "CI"
 
 on: [push, pull_request]
 
+env:
+  TAR_URL: 
"https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz;
+
 jobs:
   ci:
+name: CI using JDK ${{ matrix.java_version }}
 runs-on: ubuntu-latest
+strategy:
+  fail-fast: false
+  max-parallel: 1

Review Comment:
   Read about the GHA concurrency: 
https://docs.github.com/en/actions/using-jobs/using-concurrency#using-concurrency-in-different-scenarios
   
   My assumption is that I pushed another commit while another workflow was 
running and maybe they clashed on container name.
   
   I would suggest to add concurrency definition to our matrix builds:
   
   ```
   concurrency:
 group: ${{ github.workflow }}-${{ github.ref }}-java-${{ 
matrix.java_version }}
 cancel-in-progress: true
   ```
   
   This way if commits pushed one after the other only the last commit will run 
the workflow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34419][docker] Add tests for JDK 17 & 21 [flink-docker]

2024-02-29 Thread via GitHub


XComp commented on code in PR #182:
URL: https://github.com/apache/flink-docker/pull/182#discussion_r1507624836


##
.github/workflows/ci.yml:
##
@@ -17,14 +17,22 @@ name: "CI"
 
 on: [push, pull_request]
 
+env:
+  TAR_URL: 
"https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz;
+
 jobs:
   ci:
+name: CI using JDK ${{ matrix.java_version }}
 runs-on: ubuntu-latest
+strategy:
+  fail-fast: false
+  max-parallel: 1

Review Comment:
   > #11 8.399 + /bin/bash /opt/flink/bin/config-parser-utils.sh 
/opt/flink/conf /opt/flink/bin /opt/flink/lib -repKV 
rest.address,localhost,0.0.0.0 -repKV rest.bind-address,localhost,0.0.0.0 
-repKV jobmanager.bind-host,localhost,0.0.0.0 -repKV 
taskmanager.bind-host,localhost,0.0.0.0 -rmKV taskmanager.host=localhost
   > #11 8.625 sed: can't read /config.yaml: No such file or directory
   
   I forgot that we're also doing testing. Maybe, there's something odd with 
the config modification logic (see [workflow 
log](https://github.com/morazow/flink-docker/actions/runs/8081181748/job/22079149028#step:4:8838))?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34419][docker] Add tests for JDK 17 & 21 [flink-docker]

2024-02-29 Thread via GitHub


XComp commented on code in PR #182:
URL: https://github.com/apache/flink-docker/pull/182#discussion_r1507624836


##
.github/workflows/ci.yml:
##
@@ -17,14 +17,22 @@ name: "CI"
 
 on: [push, pull_request]
 
+env:
+  TAR_URL: 
"https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz;
+
 jobs:
   ci:
+name: CI using JDK ${{ matrix.java_version }}
 runs-on: ubuntu-latest
+strategy:
+  fail-fast: false
+  max-parallel: 1

Review Comment:
   > #11 8.399 + /bin/bash /opt/flink/bin/config-parser-utils.sh 
/opt/flink/conf /opt/flink/bin /opt/flink/lib -repKV 
rest.address,localhost,0.0.0.0 -repKV rest.bind-address,localhost,0.0.0.0 
-repKV jobmanager.bind-host,localhost,0.0.0.0 -repKV 
taskmanager.bind-host,localhost,0.0.0.0 -rmKV taskmanager.host=localhost
   > #11 8.625 sed: can't read /config.yaml: No such file or directory
   
   Maybe, there's something odd with the config modification logic (see 
[workflow 
log](https://github.com/morazow/flink-docker/actions/runs/8081181748/job/22079149028#step:4:8838))?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-34550) attempted task still report metric of currentEmitEventTimeLag

2024-02-29 Thread siwei.gao (Jira)
siwei.gao created FLINK-34550:
-

 Summary: attempted task still report metric of 
currentEmitEventTimeLag
 Key: FLINK-34550
 URL: https://issues.apache.org/jira/browse/FLINK-34550
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.17.1
 Environment: flink version:1.17.1  

kafka-connector:1.17.1
Reporter: siwei.gao
 Attachments: image-2024-02-29-21-41-01-709.png, 
image-2024-02-29-21-43-18-340.png, image-2024-02-29-21-50-55-160.png

Attempted task still report metric of currentEmitEventTimeLag when use 
kafka-connector.Attempt_num for reporting indicators of multiple tasks with the 
same subtask_index but different task_attempt_num times within the same time 
period. !image-2024-02-29-21-43-18-340.png!

Only the metric which  tash_attempt_num is 4 should be reported normally.

This condition shows in taskmanager with multiple slots and it's ok when 
taskmanager only has one slot.

!image-2024-02-29-21-50-55-160.png!

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33436] Documentation for the built-in Profiler [flink]

2024-02-29 Thread via GitHub


yuchen-ecnu commented on PR #24403:
URL: https://github.com/apache/flink/pull/24403#issuecomment-1971160199

   Hi @rmetzger, thanks for the quick and detailed comments! While 
async-profiler supports most platforms, not all. So that's a good suggestion to 
add the "Requirements" section, and I've added that in the latest commit. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34419][docker] Add tests for JDK 17 & 21 [flink-docker]

2024-02-29 Thread via GitHub


morazow commented on code in PR #182:
URL: https://github.com/apache/flink-docker/pull/182#discussion_r1507558823


##
.github/workflows/ci.yml:
##
@@ -17,14 +17,22 @@ name: "CI"
 
 on: [push, pull_request]
 
+env:
+  TAR_URL: 
"https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz;
+
 jobs:
   ci:
+name: CI using JDK ${{ matrix.java_version }}
 runs-on: ubuntu-latest
+strategy:
+  fail-fast: false
+  max-parallel: 1

Review Comment:
   But the rerun of the job is fine: 
https://github.com/morazow/flink-docker/actions/runs/8081181748
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-34529) Projection cannot be pushed down through rank operator.

2024-02-29 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822118#comment-17822118
 ] 

Benchao Li edited comment on FLINK-34529 at 2/29/24 1:08 PM:
-

[~xuyangzhong] [~nilerzhou] Thank you for the explanation, it helps.

I would prefer to putting these transposing rules all in "LOGICAL" stage, since 
in this stage we are using cost-based planner. I'm wondering if it's really 
necessary to have some transposing rules (now only 
{{{}CalcRankTransposeRule{}}}) in "LOGICAL_REWRITE" stage, could you check 
whether we still needs {{CalcRankTransposeRule}} in "LOGICAL_REWRITE" after 
introducing {{ProjectWindowTransposeRule}} in "LOGICAL" stage?

What's more, I'm even wondering that if we really needs 
{{{}CalcRankTransposeRule{}}}. {{Rank}} is a special form of {{{}Window{}}}, so 
{{ProjectWindowTransposeRule}} should supersede {{{}CalcRankTransposeRule{}}}.

(By saying "transposing rules", usually I would expect these rules are only 
generating more plan alternatives, cost-based planner chooses which is is 
better via cost. That's why you can see many counter pairs of rules like 
{{AggregateFilterTransposeRule}} and {{FilterAggregateTransposeRule}} in 
Calcite)


was (Author: libenchao):
[~xuyangzhong][~nilerzhou] Thank you for the explanation, it helps. 

I would prefer to putting these transposing rules all in "LOGICAL" stage, since 
in this stage we are using cost-based planner. I'm wondering if it's really 
necessary to have some transposing rules (now only {{CalcJoinTransposeRule}}) 
in "LOGICAL_REWRITE" stage, could you check whether we still needs  
{{CalcJoinTransposeRule}} in "LOGICAL_REWRITE" after introducing 
{{ProjectWindowTransposeRule}} in "LOGICAL" stage?

What's more, I'm even wondering that if we really needs 
{{CalcJoinTransposeRule}}. {{Rank}} is a special form of {{Window}}, so 
{{ProjectWindowTransposeRule}} should supersede {{CalcJoinTransposeRule}}. 

(By saying "transposing rules", usually I would expect these rules are only 
generating more plan alternatives, cost-based planner chooses which is is 
better via cost. That's why you can see many counter pairs of rules like 
{{AggregateFilterTransposeRule}} and {{FilterAggregateTransposeRule}} in 
Calcite)

> Projection cannot be pushed down through rank operator.
> ---
>
> Key: FLINK-34529
> URL: https://issues.apache.org/jira/browse/FLINK-34529
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: yisha zhou
>Assignee: yisha zhou
>Priority: Major
>
> When there is a rank/deduplicate operator, the projection based on output of 
> this operator cannot be pushed down to the input of it.
> The following code can help reproducing the issue:
> {code:java}
> val util = streamTestUtil() 
> util.addTableSource[(String, Int, String)]("T1", 'a, 'b, 'c)
> util.addTableSource[(String, Int, String)]("T2", 'd, 'e, 'f)
> val sql =
>   """
> |SELECT a FROM (
> |  SELECT a, f,
> |  ROW_NUMBER() OVER (PARTITION BY f ORDER BY c DESC) as rank_num
> |  FROM  T1, T2
> |  WHERE T1.a = T2.d
> |)
> |WHERE rank_num = 1
>   """.stripMargin
> util.verifyPlan(sql){code}
> The plan is expected to be:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], 
> select=[a, c, f])
>+- Exchange(distribution=[hash[f]])
>   +- Calc(select=[a, c, f])
>  +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, c, d, f], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[a]])
> :  +- Calc(select=[a, c])
> : +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +- Exchange(distribution=[hash[d]])
>+- Calc(select=[d, f])
>   +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) 
> {code}
> Notice that the 'select' of Join operator is [a, c, d, f]. However the actual 
> plan is:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], 
> select=[a, c, f])
>+- Exchange(distribution=[hash[f]])
>   +- Calc(select=[a, c, f])
>  +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, 
> e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[a]])
> :  +- LegacyTableSourceScan(table=[[default_catalog, 
> 

[jira] [Commented] (FLINK-34529) Projection cannot be pushed down through rank operator.

2024-02-29 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822118#comment-17822118
 ] 

Benchao Li commented on FLINK-34529:


[~xuyangzhong][~nilerzhou] Thank you for the explanation, it helps. 

I would prefer to putting these transposing rules all in "LOGICAL" stage, since 
in this stage we are using cost-based planner. I'm wondering if it's really 
necessary to have some transposing rules (now only {{CalcJoinTransposeRule}}) 
in "LOGICAL_REWRITE" stage, could you check whether we still needs  
{{CalcJoinTransposeRule}} in "LOGICAL_REWRITE" after introducing 
{{ProjectWindowTransposeRule}} in "LOGICAL" stage?

What's more, I'm even wondering that if we really needs 
{{CalcJoinTransposeRule}}. {{Rank}} is a special form of {{Window}}, so 
{{ProjectWindowTransposeRule}} should supersede {{CalcJoinTransposeRule}}. 

(By saying "transposing rules", usually I would expect these rules are only 
generating more plan alternatives, cost-based planner chooses which is is 
better via cost. That's why you can see many counter pairs of rules like 
{{AggregateFilterTransposeRule}} and {{FilterAggregateTransposeRule}} in 
Calcite)

> Projection cannot be pushed down through rank operator.
> ---
>
> Key: FLINK-34529
> URL: https://issues.apache.org/jira/browse/FLINK-34529
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: yisha zhou
>Priority: Major
>
> When there is a rank/deduplicate operator, the projection based on output of 
> this operator cannot be pushed down to the input of it.
> The following code can help reproducing the issue:
> {code:java}
> val util = streamTestUtil() 
> util.addTableSource[(String, Int, String)]("T1", 'a, 'b, 'c)
> util.addTableSource[(String, Int, String)]("T2", 'd, 'e, 'f)
> val sql =
>   """
> |SELECT a FROM (
> |  SELECT a, f,
> |  ROW_NUMBER() OVER (PARTITION BY f ORDER BY c DESC) as rank_num
> |  FROM  T1, T2
> |  WHERE T1.a = T2.d
> |)
> |WHERE rank_num = 1
>   """.stripMargin
> util.verifyPlan(sql){code}
> The plan is expected to be:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], 
> select=[a, c, f])
>+- Exchange(distribution=[hash[f]])
>   +- Calc(select=[a, c, f])
>  +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, c, d, f], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[a]])
> :  +- Calc(select=[a, c])
> : +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +- Exchange(distribution=[hash[d]])
>+- Calc(select=[d, f])
>   +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) 
> {code}
> Notice that the 'select' of Join operator is [a, c, d, f]. However the actual 
> plan is:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], 
> select=[a, c, f])
>+- Exchange(distribution=[hash[f]])
>   +- Calc(select=[a, c, f])
>  +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, 
> e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[a]])
> :  +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +- Exchange(distribution=[hash[d]])
>+- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
>  {code}
> the 'select' of Join operator is [a, b, c, d, e, f], which means the 
> projection in the final Calc is not passed through the Rank.
> And I think an easy way to fix this issue is to add 
> org.apache.calcite.rel.rules.ProjectWindowTransposeRule into 
> FlinkStreamRuleSets.LOGICAL_OPT_RULES.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34529) Projection cannot be pushed down through rank operator.

2024-02-29 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822119#comment-17822119
 ] 

Benchao Li commented on FLINK-34529:


[~nilerzhou] Assigned to you~

> Projection cannot be pushed down through rank operator.
> ---
>
> Key: FLINK-34529
> URL: https://issues.apache.org/jira/browse/FLINK-34529
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: yisha zhou
>Assignee: yisha zhou
>Priority: Major
>
> When there is a rank/deduplicate operator, the projection based on output of 
> this operator cannot be pushed down to the input of it.
> The following code can help reproducing the issue:
> {code:java}
> val util = streamTestUtil() 
> util.addTableSource[(String, Int, String)]("T1", 'a, 'b, 'c)
> util.addTableSource[(String, Int, String)]("T2", 'd, 'e, 'f)
> val sql =
>   """
> |SELECT a FROM (
> |  SELECT a, f,
> |  ROW_NUMBER() OVER (PARTITION BY f ORDER BY c DESC) as rank_num
> |  FROM  T1, T2
> |  WHERE T1.a = T2.d
> |)
> |WHERE rank_num = 1
>   """.stripMargin
> util.verifyPlan(sql){code}
> The plan is expected to be:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], 
> select=[a, c, f])
>+- Exchange(distribution=[hash[f]])
>   +- Calc(select=[a, c, f])
>  +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, c, d, f], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[a]])
> :  +- Calc(select=[a, c])
> : +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +- Exchange(distribution=[hash[d]])
>+- Calc(select=[d, f])
>   +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) 
> {code}
> Notice that the 'select' of Join operator is [a, c, d, f]. However the actual 
> plan is:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], 
> select=[a, c, f])
>+- Exchange(distribution=[hash[f]])
>   +- Calc(select=[a, c, f])
>  +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, 
> e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[a]])
> :  +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +- Exchange(distribution=[hash[d]])
>+- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
>  {code}
> the 'select' of Join operator is [a, b, c, d, e, f], which means the 
> projection in the final Calc is not passed through the Rank.
> And I think an easy way to fix this issue is to add 
> org.apache.calcite.rel.rules.ProjectWindowTransposeRule into 
> FlinkStreamRuleSets.LOGICAL_OPT_RULES.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34529) Projection cannot be pushed down through rank operator.

2024-02-29 Thread Benchao Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Benchao Li reassigned FLINK-34529:
--

Assignee: yisha zhou

> Projection cannot be pushed down through rank operator.
> ---
>
> Key: FLINK-34529
> URL: https://issues.apache.org/jira/browse/FLINK-34529
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: yisha zhou
>Assignee: yisha zhou
>Priority: Major
>
> When there is a rank/deduplicate operator, the projection based on output of 
> this operator cannot be pushed down to the input of it.
> The following code can help reproducing the issue:
> {code:java}
> val util = streamTestUtil() 
> util.addTableSource[(String, Int, String)]("T1", 'a, 'b, 'c)
> util.addTableSource[(String, Int, String)]("T2", 'd, 'e, 'f)
> val sql =
>   """
> |SELECT a FROM (
> |  SELECT a, f,
> |  ROW_NUMBER() OVER (PARTITION BY f ORDER BY c DESC) as rank_num
> |  FROM  T1, T2
> |  WHERE T1.a = T2.d
> |)
> |WHERE rank_num = 1
>   """.stripMargin
> util.verifyPlan(sql){code}
> The plan is expected to be:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], 
> select=[a, c, f])
>+- Exchange(distribution=[hash[f]])
>   +- Calc(select=[a, c, f])
>  +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, c, d, f], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[a]])
> :  +- Calc(select=[a, c])
> : +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +- Exchange(distribution=[hash[d]])
>+- Calc(select=[d, f])
>   +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) 
> {code}
> Notice that the 'select' of Join operator is [a, c, d, f]. However the actual 
> plan is:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], 
> select=[a, c, f])
>+- Exchange(distribution=[hash[f]])
>   +- Calc(select=[a, c, f])
>  +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, 
> e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[a]])
> :  +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +- Exchange(distribution=[hash[d]])
>+- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
>  {code}
> the 'select' of Join operator is [a, b, c, d, e, f], which means the 
> projection in the final Calc is not passed through the Rank.
> And I think an easy way to fix this issue is to add 
> org.apache.calcite.rel.rules.ProjectWindowTransposeRule into 
> FlinkStreamRuleSets.LOGICAL_OPT_RULES.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34546] Emit span with failure labels on failure. [flink]

2024-02-29 Thread via GitHub


pnowojski commented on code in PR #24414:
URL: https://github.com/apache/flink/pull/24414#discussion_r1507543874


##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java:
##
@@ -141,6 +155,51 @@ private CompletableFuture> 
labelFailure(Throwable cause, boo
 return FailureEnricherUtils.labelFailure(cause, ctx, 
mainThreadExecutor, failureEnrichers);
 }
 
+private FailureHandlingResult handleFailureAndReport(
+@Nullable final Execution failedExecution,
+final Throwable cause,
+long timestamp,
+final Set verticesToRestart,
+final boolean globalFailure) {
+
+FailureHandlingResult failureHandlingResult =
+handleFailure(failedExecution, cause, timestamp, 
verticesToRestart, globalFailure);
+
+if (reportEventsAsSpans) {
+// TODO: replace with reporting as event once events are supported.
+// Add reporting as callback for when the failure labeling is 
completed.
+failureHandlingResult
+.getFailureLabels()
+.thenAcceptAsync(
+labels -> 
reportFailureHandling(failureHandlingResult, labels),
+mainThreadExecutor);
+}
+
+return failureHandlingResult;
+}
+
+private void reportFailureHandling(
+FailureHandlingResult failureHandlingResult, Map 
failureLabels) {
+
+// Add base attributes
+SpanBuilder spanBuilder =
+Span.builder(ExecutionFailureHandler.class, "JobFailure")
+.setStartTsMillis(failureHandlingResult.getTimestamp())
+.setEndTsMillis(failureHandlingResult.getTimestamp())
+.setAttribute(
+"canRestart", 
String.valueOf(failureHandlingResult.canRestart()))
+.setAttribute(
+"isGlobalFailure",
+
String.valueOf(failureHandlingResult.isGlobalFailure()));

Review Comment:
   I see. I think it would be still better to document it but I don't have 
strong preferences. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34546] Emit span with failure labels on failure. [flink]

2024-02-29 Thread via GitHub


StefanRRichter commented on code in PR #24414:
URL: https://github.com/apache/flink/pull/24414#discussion_r1507540785


##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java:
##
@@ -141,6 +155,51 @@ private CompletableFuture> 
labelFailure(Throwable cause, boo
 return FailureEnricherUtils.labelFailure(cause, ctx, 
mainThreadExecutor, failureEnrichers);
 }
 
+private FailureHandlingResult handleFailureAndReport(
+@Nullable final Execution failedExecution,
+final Throwable cause,
+long timestamp,
+final Set verticesToRestart,
+final boolean globalFailure) {
+
+FailureHandlingResult failureHandlingResult =
+handleFailure(failedExecution, cause, timestamp, 
verticesToRestart, globalFailure);
+
+if (reportEventsAsSpans) {
+// TODO: replace with reporting as event once events are supported.
+// Add reporting as callback for when the failure labeling is 
completed.
+failureHandlingResult
+.getFailureLabels()
+.thenAcceptAsync(
+labels -> 
reportFailureHandling(failureHandlingResult, labels),
+mainThreadExecutor);
+}
+
+return failureHandlingResult;
+}
+
+private void reportFailureHandling(
+FailureHandlingResult failureHandlingResult, Map 
failureLabels) {
+
+// Add base attributes
+SpanBuilder spanBuilder =
+Span.builder(ExecutionFailureHandler.class, "JobFailure")
+.setStartTsMillis(failureHandlingResult.getTimestamp())
+.setEndTsMillis(failureHandlingResult.getTimestamp())
+.setAttribute(
+"canRestart", 
String.valueOf(failureHandlingResult.canRestart()))
+.setAttribute(
+"isGlobalFailure",
+
String.valueOf(failureHandlingResult.isGlobalFailure()));

Review Comment:
   That's what I had before, but since it's not enabled by default right now 
I'd suggest we document this once it is reported as event by default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34546] Emit span with failure labels on failure. [flink]

2024-02-29 Thread via GitHub


pnowojski commented on code in PR #24414:
URL: https://github.com/apache/flink/pull/24414#discussion_r1507538335


##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java:
##
@@ -141,6 +155,51 @@ private CompletableFuture> 
labelFailure(Throwable cause, boo
 return FailureEnricherUtils.labelFailure(cause, ctx, 
mainThreadExecutor, failureEnrichers);
 }
 
+private FailureHandlingResult handleFailureAndReport(
+@Nullable final Execution failedExecution,
+final Throwable cause,
+long timestamp,
+final Set verticesToRestart,
+final boolean globalFailure) {
+
+FailureHandlingResult failureHandlingResult =
+handleFailure(failedExecution, cause, timestamp, 
verticesToRestart, globalFailure);
+
+if (reportEventsAsSpans) {
+// TODO: replace with reporting as event once events are supported.
+// Add reporting as callback for when the failure labeling is 
completed.
+failureHandlingResult
+.getFailureLabels()
+.thenAcceptAsync(
+labels -> 
reportFailureHandling(failureHandlingResult, labels),
+mainThreadExecutor);
+}
+
+return failureHandlingResult;
+}
+
+private void reportFailureHandling(
+FailureHandlingResult failureHandlingResult, Map 
failureLabels) {
+
+// Add base attributes
+SpanBuilder spanBuilder =
+Span.builder(ExecutionFailureHandler.class, "JobFailure")
+.setStartTsMillis(failureHandlingResult.getTimestamp())
+.setEndTsMillis(failureHandlingResult.getTimestamp())
+.setAttribute(
+"canRestart", 
String.valueOf(failureHandlingResult.canRestart()))
+.setAttribute(
+"isGlobalFailure",
+
String.valueOf(failureHandlingResult.isGlobalFailure()));

Review Comment:
   Can you add this span to the documentation of reported spans?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34546] Emit span with failure labels on failure. [flink]

2024-02-29 Thread via GitHub


StefanRRichter commented on code in PR #24414:
URL: https://github.com/apache/flink/pull/24414#discussion_r1507536006


##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java:
##
@@ -141,6 +150,48 @@ private CompletableFuture> 
labelFailure(Throwable cause, boo
 return FailureEnricherUtils.labelFailure(cause, ctx, 
mainThreadExecutor, failureEnrichers);
 }
 
+private FailureHandlingResult handleFailureAndReport(
+@Nullable final Execution failedExecution,
+final Throwable cause,
+long timestamp,
+final Set verticesToRestart,
+final boolean globalFailure) {
+
+FailureHandlingResult failureHandlingResult =
+handleFailure(failedExecution, cause, timestamp, 
verticesToRestart, globalFailure);
+
+// Add reporting as callback for when the failure labeling is 
completed.
+failureHandlingResult
+.getFailureLabels()
+.thenAcceptAsync(
+labels -> reportFailureHandling(failureHandlingResult, 
labels),
+mainThreadExecutor);
+
+return failureHandlingResult;
+}
+
+private void reportFailureHandling(
+FailureHandlingResult failureHandlingResult, Map 
failureLabels) {
+
+// Add base attributes
+SpanBuilder spanBuilder =
+Span.builder(ExecutionFailureHandler.class, "HandleFailure")
+.setStartTsMillis(failureHandlingResult.getTimestamp())
+.setEndTsMillis(System.currentTimeMillis())
+.setAttribute(
+"canRestart", 
String.valueOf(failureHandlingResult.canRestart()))
+.setAttribute(
+"isGlobalFailure",
+
String.valueOf(failureHandlingResult.isGlobalFailure()));
+
+// Add all failure labels
+for (Map.Entry entry : failureLabels.entrySet()) {
+spanBuilder.setAttribute(
+FAILURE_LABEL_ATTRIBUTE_PREFIX + entry.getKey(), 
entry.getValue());
+}
+metricGroup.addSpan(spanBuilder);

Review Comment:
   Yes, that's what I did :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34546] Emit span with failure labels on failure. [flink]

2024-02-29 Thread via GitHub


pnowojski commented on code in PR #24414:
URL: https://github.com/apache/flink/pull/24414#discussion_r1507534949


##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java:
##
@@ -141,6 +150,48 @@ private CompletableFuture> 
labelFailure(Throwable cause, boo
 return FailureEnricherUtils.labelFailure(cause, ctx, 
mainThreadExecutor, failureEnrichers);
 }
 
+private FailureHandlingResult handleFailureAndReport(
+@Nullable final Execution failedExecution,
+final Throwable cause,
+long timestamp,
+final Set verticesToRestart,
+final boolean globalFailure) {
+
+FailureHandlingResult failureHandlingResult =
+handleFailure(failedExecution, cause, timestamp, 
verticesToRestart, globalFailure);
+
+// Add reporting as callback for when the failure labeling is 
completed.
+failureHandlingResult
+.getFailureLabels()
+.thenAcceptAsync(
+labels -> reportFailureHandling(failureHandlingResult, 
labels),
+mainThreadExecutor);
+
+return failureHandlingResult;
+}
+
+private void reportFailureHandling(
+FailureHandlingResult failureHandlingResult, Map 
failureLabels) {
+
+// Add base attributes
+SpanBuilder spanBuilder =
+Span.builder(ExecutionFailureHandler.class, "HandleFailure")
+.setStartTsMillis(failureHandlingResult.getTimestamp())
+.setEndTsMillis(System.currentTimeMillis())
+.setAttribute(
+"canRestart", 
String.valueOf(failureHandlingResult.canRestart()))
+.setAttribute(
+"isGlobalFailure",
+
String.valueOf(failureHandlingResult.isGlobalFailure()));
+
+// Add all failure labels
+for (Map.Entry entry : failureLabels.entrySet()) {
+spanBuilder.setAttribute(
+FAILURE_LABEL_ATTRIBUTE_PREFIX + entry.getKey(), 
entry.getValue());
+}
+metricGroup.addSpan(spanBuilder);

Review Comment:
   As discussed offline. Let's for the time being report failures via spans, 
but hidden behind a temporary feature toggle like "use spans instead events". 
This option should be disabled by default and marked as deprecated from the 
beginning. Once Flink will support events, we can remove that option after a 
release or so.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >