[jira] [Closed] (FLINK-34481) Migrate SetOpRewriteUtil
[ https://issues.apache.org/jira/browse/FLINK-34481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacky Lau closed FLINK-34481. - Release Note: merged f523b9d6191ecb584e36aa2aeffcd0659ce231f7 Resolution: Fixed > Migrate SetOpRewriteUtil > > > Key: FLINK-34481 > URL: https://issues.apache.org/jira/browse/FLINK-34481 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Affects Versions: 1.20.0 >Reporter: Jacky Lau >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > we should Migrate SetOpRewriteUtil for > ReplaceMinusWithAntiJoinRule > ReplaceMinusWithAntiJoinRule > RewriteIntersectAllRule > RewriteMinusAllRule -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34481) Migrate SetOpRewriteUtil
[ https://issues.apache.org/jira/browse/FLINK-34481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822434#comment-17822434 ] Jacky Lau commented on FLINK-34481: --- Merged f523b9d6191ecb584e36aa2aeffcd0659ce231f7 > Migrate SetOpRewriteUtil > > > Key: FLINK-34481 > URL: https://issues.apache.org/jira/browse/FLINK-34481 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Affects Versions: 1.20.0 >Reporter: Jacky Lau >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > we should Migrate SetOpRewriteUtil for > ReplaceMinusWithAntiJoinRule > ReplaceMinusWithAntiJoinRule > RewriteIntersectAllRule > RewriteMinusAllRule -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [hotfix][runtime] Remove unneeded requestTaskManagerFileUploadByName [flink]
1996fanrui merged PR #24386: URL: https://github.com/apache/flink/pull/24386 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][runtime] Remove unneeded requestTaskManagerFileUploadByName [flink]
1996fanrui commented on PR #24386: URL: https://github.com/apache/flink/pull/24386#issuecomment-1972683847 Thanks @yuchen-ecnu for the review, merging~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join
[ https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822433#comment-17822433 ] Shuai Xu commented on FLINK-34380: -- Let me take a look. > Strange RowKind and records about intermediate output when using minibatch > join > --- > > Key: FLINK-34380 > URL: https://issues.apache.org/jira/browse/FLINK-34380 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: xuyang >Priority: Major > Fix For: 1.19.0 > > > {code:java} > // Add it in CalcItCase > @Test > def test(): Unit = { > env.setParallelism(1) > val rows = Seq( > changelogRow("+I", java.lang.Integer.valueOf(1), "1"), > changelogRow("-U", java.lang.Integer.valueOf(1), "1"), > changelogRow("+U", java.lang.Integer.valueOf(1), "99"), > changelogRow("-D", java.lang.Integer.valueOf(1), "99") > ) > val dataId = TestValuesTableFactory.registerData(rows) > val ddl = > s""" > |CREATE TABLE t1 ( > | a int, > | b string > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$dataId', > | 'bounded' = 'false' > |) >""".stripMargin > tEnv.executeSql(ddl) > val ddl2 = > s""" > |CREATE TABLE t2 ( > | a int, > | b string > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$dataId', > | 'bounded' = 'false' > |) >""".stripMargin > tEnv.executeSql(ddl2) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, > Boolean.box(true)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, > Duration.ofSeconds(5)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L)) > println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = > t2.a").explain()) > tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() > } {code} > Output: > {code:java} > ++-+-+-+-+ > | op | a | b | a0 | b0 | > ++-+-+-+-+ > | +U | 1 | 1 | 1 | 99 | > | +U | 1 | 99 | 1 | 99 | > | -U | 1 | 1 | 1 | 99 | > | -D | 1 | 99 | 1 | 99 | > ++-+-+-+-+{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34481][table] Migrate SetOpRewriteUtil to java [flink]
JingGe merged PR #24358: URL: https://github.com/apache/flink/pull/24358 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34522][core] Changing the Time to Duration for StateTtlConfig.Builder.cleanupInRocksdbCompactFilter [flink]
1996fanrui commented on PR #24387: URL: https://github.com/apache/flink/pull/24387#issuecomment-1972677393 Many thanks @HuangXingBo for the debug. I'm on vacation and don't take my Mac, so my colleague @RocMarshal helps fix this for 1.19 first due to 1.19 will be released soon. This PR is related to master(1.20), let us do it after that, thanks~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-xxxxx][api] Introduce DataStream, Partitioning and ProcessFunction [flink]
flinkbot commented on PR #24422: URL: https://github.com/apache/flink/pull/24422#issuecomment-1972653582 ## CI report: * cb4615792281b375a45da41795e9f1a0cd70ae2d UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34487) Integrate tools/azure-pipelines/build-python-wheels.yml into GHA nightly workflow
[ https://issues.apache.org/jira/browse/FLINK-34487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822420#comment-17822420 ] Muhammet Orazov commented on FLINK-34487: - Hey [~mapohl], I'd like to work on this. Could you please assign this to me? Thanks! > Integrate tools/azure-pipelines/build-python-wheels.yml into GHA nightly > workflow > - > > Key: FLINK-34487 > URL: https://issues.apache.org/jira/browse/FLINK-34487 > Project: Flink > Issue Type: Sub-task > Components: Build System / CI >Affects Versions: 1.19.0, 1.18.1, 1.20.0 >Reporter: Matthias Pohl >Priority: Major > Labels: github-actions > > Analogously to the [Azure Pipelines nightly > config|https://github.com/apache/flink/blob/e923d4060b6dabe650a8950774d176d3e92437c2/tools/azure-pipelines/build-apache-repo.yml#L183] > we want to generate the wheels artifacts in the GHA nightly workflow as well. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-xxxxx][api] Introduce DataStream, Partitioning and ProcessFunction [flink]
reswqa opened a new pull request, #24422: URL: https://github.com/apache/flink/pull/24422 ## What is the purpose of the change *This is the first PR for DataStream V2, and aim to implement FLIP-409: DataStream V2 Building Blocks: DataStream, Partitioning and ProcessFunction * ## Brief change log - *Introduce four type of `streams` in FLIP-409.* - *Introduce `ProcessFunction` and all it's variants. * - *Introduce new `ExecutionEnvironment` to submit job written in the new API.* - *Supports FLIP-27 based `Source` and `Sink-v2`* ## Verifying this change This change added new tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - Documentation needs to be added in subsequent PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34535] Support JobPlanInfo for the explain result [flink]
huyuanfeng2018 commented on PR #24411: URL: https://github.com/apache/flink/pull/24411#issuecomment-1972635698 > Hi @huyuanfeng2018 thanks for the PR. I have a few comments: > > * Please add `JSON_JOB_PLAN ` to the `python` API (e.g., to the `basic_operations.py`) and test it there as well (e.g., in `test_table_environment.py`) > * Why the `JSON_JOB_PLAN` keyword didn't end up in our parser (`Parser.tdd`, `Parser.jj`)? > * The main motivation behind this PR "' combine it with the parameter `pipeline.jobvertex-parallelism-overrides` to set up my task parallelism'" is not clear to me and is missing in this PR tests > * Add the necessary documentation to the `explain.md` > * Please add test with `Hive` as well (e.g., see `HiveTableSinkITCase::testHiveTableSinkWithParallelismBase` > * Add `JSON_JOB_PLAN ` to the `FlinkSqlParserImplConstants` > * Update `TableTestBase::doVerifyExplain` to support `JSON_JOB_PLAN` as well > * Tests with table API are missing Thank you very much for your review. There is still a lot of work to be done on this PR. I will mark it as a draft first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34180) Accept Flink CDC project as part of Apache Flink
[ https://issues.apache.org/jira/browse/FLINK-34180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu updated FLINK-34180: --- Description: As discussed in Flink dev mailing list[1][2], we have accepted the Flink CDC project contribution, we should finish the repo and doc migration as soon as possible. [1] https://lists.apache.org/thread/sq5w21tcomrmb025tl820cxty9l0z26w [2] https://lists.apache.org/thread/cw29fhsp99243yfo95xrkw82s5s418ob was: As discussed in Flink dev mailing list[1][2], we have accepted the Flink CDC project contribution, we should finished the repo and doc migration as soon as possible. [1] https://lists.apache.org/thread/sq5w21tcomrmb025tl820cxty9l0z26w [2] https://lists.apache.org/thread/cw29fhsp99243yfo95xrkw82s5s418ob > Accept Flink CDC project as part of Apache Flink > > > Key: FLINK-34180 > URL: https://issues.apache.org/jira/browse/FLINK-34180 > Project: Flink > Issue Type: New Feature > Components: Flink CDC >Reporter: Leonard Xu >Assignee: Leonard Xu >Priority: Major > > As discussed in Flink dev mailing list[1][2], we have accepted the Flink > CDC project contribution, we should finish the repo and doc migration as soon > as possible. > [1] https://lists.apache.org/thread/sq5w21tcomrmb025tl820cxty9l0z26w > [2] https://lists.apache.org/thread/cw29fhsp99243yfo95xrkw82s5s418ob -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34529) Projection cannot be pushed down through rank operator.
[ https://issues.apache.org/jira/browse/FLINK-34529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822415#comment-17822415 ] yisha zhou commented on FLINK-34529: hi [~lincoln.86xy] , thanks for your advice. After discussion with [~libenchao] , I agreed that putting these kind of rules to cost-based planner seems to be in line with future trend. Meanwhile I found that most of ProjectXXTransposeRules are in `FlinkStreamRuleSets#PROJECT_RULES`and `PROJECT_RULES` seems to be used both in 'LOGICAL' (volcano)and 'PROJECT_REWRITE'(hep). I prepare to add `CoreRules.PROJECT_WINDOW_TRANSPOSE` to `FlinkStreamRuleSets#PROJECT_RULES`too, so that both kind of planner can utilize the rule. WDT? > Projection cannot be pushed down through rank operator. > --- > > Key: FLINK-34529 > URL: https://issues.apache.org/jira/browse/FLINK-34529 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.19.0 >Reporter: yisha zhou >Assignee: yisha zhou >Priority: Major > > When there is a rank/deduplicate operator, the projection based on output of > this operator cannot be pushed down to the input of it. > The following code can help reproducing the issue: > {code:java} > val util = streamTestUtil() > util.addTableSource[(String, Int, String)]("T1", 'a, 'b, 'c) > util.addTableSource[(String, Int, String)]("T2", 'd, 'e, 'f) > val sql = > """ > |SELECT a FROM ( > | SELECT a, f, > | ROW_NUMBER() OVER (PARTITION BY f ORDER BY c DESC) as rank_num > | FROM T1, T2 > | WHERE T1.a = T2.d > |) > |WHERE rank_num = 1 > """.stripMargin > util.verifyPlan(sql){code} > The plan is expected to be: > {code:java} > Calc(select=[a]) > +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], > select=[a, c, f]) >+- Exchange(distribution=[hash[f]]) > +- Calc(select=[a, c, f]) > +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, c, d, f], > leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[hash[a]]) > : +- Calc(select=[a, c]) > : +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > +- Exchange(distribution=[hash[d]]) >+- Calc(select=[d, f]) > +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) > {code} > Notice that the 'select' of Join operator is [a, c, d, f]. However the actual > plan is: > {code:java} > Calc(select=[a]) > +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], > select=[a, c, f]) >+- Exchange(distribution=[hash[f]]) > +- Calc(select=[a, c, f]) > +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, > e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[hash[a]]) > : +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > +- Exchange(distribution=[hash[d]]) >+- LegacyTableSourceScan(table=[[default_catalog, > default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) > {code} > the 'select' of Join operator is [a, b, c, d, e, f], which means the > projection in the final Calc is not passed through the Rank. > And I think an easy way to fix this issue is to add > org.apache.calcite.rel.rules.ProjectWindowTransposeRule into > FlinkStreamRuleSets.LOGICAL_OPT_RULES. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34548) FLIP-409: DataStream V2 Building Blocks: DataStream, Partitioning and ProcessFunction
[ https://issues.apache.org/jira/browse/FLINK-34548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-34548: --- Labels: (was: Umbrella) > FLIP-409: DataStream V2 Building Blocks: DataStream, Partitioning and > ProcessFunction > - > > Key: FLINK-34548 > URL: https://issues.apache.org/jira/browse/FLINK-34548 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream >Reporter: Weijie Guo >Assignee: Weijie Guo >Priority: Major > > This is the umbrella ticket for FLIP-409: DataStream V2 Building Blocks: > DataStream, Partitioning and ProcessFunction. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34548) FLIP-409: DataStream V2 Building Blocks: DataStream, Partitioning and ProcessFunction
[ https://issues.apache.org/jira/browse/FLINK-34548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-34548: --- Description: This is the ticket for FLIP-409: DataStream V2 Building Blocks: DataStream, Partitioning and ProcessFunction. (was: This is the umbrella ticket for FLIP-409: DataStream V2 Building Blocks: DataStream, Partitioning and ProcessFunction.) > FLIP-409: DataStream V2 Building Blocks: DataStream, Partitioning and > ProcessFunction > - > > Key: FLINK-34548 > URL: https://issues.apache.org/jira/browse/FLINK-34548 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream >Reporter: Weijie Guo >Assignee: Weijie Guo >Priority: Major > > This is the ticket for FLIP-409: DataStream V2 Building Blocks: DataStream, > Partitioning and ProcessFunction. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join
[ https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822401#comment-17822401 ] xuyang commented on FLINK-34380: Hi, [~xu_shuai_] . Can you help check it again? > Strange RowKind and records about intermediate output when using minibatch > join > --- > > Key: FLINK-34380 > URL: https://issues.apache.org/jira/browse/FLINK-34380 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: xuyang >Priority: Major > Fix For: 1.19.0 > > > {code:java} > // Add it in CalcItCase > @Test > def test(): Unit = { > env.setParallelism(1) > val rows = Seq( > changelogRow("+I", java.lang.Integer.valueOf(1), "1"), > changelogRow("-U", java.lang.Integer.valueOf(1), "1"), > changelogRow("+U", java.lang.Integer.valueOf(1), "99"), > changelogRow("-D", java.lang.Integer.valueOf(1), "99") > ) > val dataId = TestValuesTableFactory.registerData(rows) > val ddl = > s""" > |CREATE TABLE t1 ( > | a int, > | b string > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$dataId', > | 'bounded' = 'false' > |) >""".stripMargin > tEnv.executeSql(ddl) > val ddl2 = > s""" > |CREATE TABLE t2 ( > | a int, > | b string > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$dataId', > | 'bounded' = 'false' > |) >""".stripMargin > tEnv.executeSql(ddl2) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, > Boolean.box(true)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, > Duration.ofSeconds(5)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L)) > println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = > t2.a").explain()) > tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() > } {code} > Output: > {code:java} > ++-+-+-+-+ > | op | a | b | a0 | b0 | > ++-+-+-+-+ > | +U | 1 | 1 | 1 | 99 | > | +U | 1 | 99 | 1 | 99 | > | -U | 1 | 1 | 1 | 99 | > | -D | 1 | 99 | 1 | 99 | > ++-+-+-+-+{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34535) Support JobPlanInfo for the explain result
[ https://issues.apache.org/jira/browse/FLINK-34535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822399#comment-17822399 ] yuanfenghu commented on FLINK-34535: [~lincoln.86xy] Thank you for your comment, > Regarding to the motivation 'combine it with the parameter > `pipeline.jobvertex-parallelism-overrides` to set up my task parallelism', > could you explain more about it? `pipeline.jobvertex-parallelism-overrides` Parameters can modify the parallelism of each flink task before flink runs the task. He needs to specify the parallelism value of vertex in jobgraph, like this: {code:java} //代码占位符 pipeline.jobvertex-parallelism-overrides = 0a448493b4782967b150582570326227:4,bc764cd8ddf7a0cff126f51c16239658:3 {code} This way when flink runs, the parallelism of the corresponding vertexid: 0a448493b4782967b150582570326227 and bc764cd8ddf7a0cff126f51c16239658 will be set to 4,3. So my motivation is that I want to set the parallelism of each of my tasks in the task generated by flinksql, but in flinksql parallelism is set globally, so I need to get each of my jobVertexId before the task is run. But the existing explain does not return this information, so I want to return this information in the explain > (Also adding new mode to current `ExplainDetail` is a public api change, >there should be a >[FLIP|https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals?src=contextnavpagetreemode] > discussion) I noticed ExplainDetail have a @PublicEvolving, But in fact, my function is relatively simple, and it should be to build a separate FLIP? Maybe if we could put this information in What about JSON_EXECUTION_PLAN? > COMPILE PLAN COMPILE PLAN It seems to be a reasonable way also,Can you help me @ some friends who are more familiar with this area (flinksql)? Discuss their views on this issue > Support JobPlanInfo for the explain result > -- > > Key: FLINK-34535 > URL: https://issues.apache.org/jira/browse/FLINK-34535 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: yuanfenghu >Priority: Major > Labels: pull-request-available > > In the Flink Sql Explain syntax, we can set ExplainDetails to plan > JSON_EXECUTION_PLAN, but we cannot plan JobPlanInfo. If we can explain this > part of the information, referring to JobPlanInfo, I can combine it with the > parameter `pipeline.jobvertex-parallelism-overrides` to set up my task > parallelism -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]
LadyForest commented on code in PR #24390: URL: https://github.com/apache/flink/pull/24390#discussion_r1508526295 ## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/utils/PathUtils.java: ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.table.utils; + +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.UUID; + +/** Path utils for file system. */ +public class PathUtils { Review Comment: I originally planned to move the logic for generating the staging directory into `FileSystemOutputFormat`. However, when modifying the `HiveTableSink`, I found that `createBatchCompactSink` relies on tmpPath. I didn't want to introduce changes to other modules, so I ultimately opted for a compromise and extracted this utility method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-34535) Support JobPlanInfo for the explain result
[ https://issues.apache.org/jira/browse/FLINK-34535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822390#comment-17822390 ] lincoln lee edited comment on FLINK-34535 at 3/1/24 5:43 AM: - [~heigebupahei] The current `explain` syntax is just for showing the planinfo, if we want to override something to the real plan which is for execution, I suggest to extend current compiled plan, as we have such ways to compile a query and execute the compiled plan: {code:java} COMPILE PLAN FOR '' EXECUTE PLAN FOR ''{code} (Also adding new mode to current `ExplainDetail` is a public api change, there should be a [FLIP|https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals?src=contextnavpagetreemode] discussion) Regarding to the motivation 'combine it with the parameter `pipeline.jobvertex-parallelism-overrides` to set up my task parallelism', could you explain more about it? was (Author: lincoln.86xy): [~heigebupahei] The current `explain` syntax is just for showing the planinfo, if we want to override something to the real plan which is for execution, I suggest to extend current compiled plan, as we have such ways to compile a query and execute the compiled plan: {{{}{}}}{{{}{}}} {code:java} COMPILE PLAN FOR '' EXECUTE PLAN FOR ''{code} (Also adding new mode to current `ExplainDetail` is a public api change, there should be a [FLIP|https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals?src=contextnavpagetreemode] discussion) Regarding to the motivation 'combine it with the parameter `pipeline.jobvertex-parallelism-overrides` to set up my task parallelism', could you explain more about it? {{}} > Support JobPlanInfo for the explain result > -- > > Key: FLINK-34535 > URL: https://issues.apache.org/jira/browse/FLINK-34535 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: yuanfenghu >Priority: Major > Labels: pull-request-available > > In the Flink Sql Explain syntax, we can set ExplainDetails to plan > JSON_EXECUTION_PLAN, but we cannot plan JobPlanInfo. If we can explain this > part of the information, referring to JobPlanInfo, I can combine it with the > parameter `pipeline.jobvertex-parallelism-overrides` to set up my task > parallelism -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34535) Support JobPlanInfo for the explain result
[ https://issues.apache.org/jira/browse/FLINK-34535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822390#comment-17822390 ] lincoln lee commented on FLINK-34535: - [~heigebupahei] The current `explain` syntax is just for showing the planinfo, if we want to override something to the real plan which is for execution, I suggest to extend current compiled plan, as we have such ways to compile a query and execute the compiled plan: {{{}{}}}{{{}{}}} {code:java} COMPILE PLAN FOR '' EXECUTE PLAN FOR ''{code} (Also adding new mode to current `ExplainDetail` is a public api change, there should be a [FLIP|https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals?src=contextnavpagetreemode] discussion) Regarding to the motivation 'combine it with the parameter `pipeline.jobvertex-parallelism-overrides` to set up my task parallelism', could you explain more about it? {{}} > Support JobPlanInfo for the explain result > -- > > Key: FLINK-34535 > URL: https://issues.apache.org/jira/browse/FLINK-34535 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: yuanfenghu >Priority: Major > Labels: pull-request-available > > In the Flink Sql Explain syntax, we can set ExplainDetails to plan > JSON_EXECUTION_PLAN, but we cannot plan JobPlanInfo. If we can explain this > part of the information, referring to JobPlanInfo, I can combine it with the > parameter `pipeline.jobvertex-parallelism-overrides` to set up my task > parallelism -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34556][table] Migrate EnumerableToLogicalTableScan to java. [flink]
flinkbot commented on PR #24421: URL: https://github.com/apache/flink/pull/24421#issuecomment-1972492174 ## CI report: * c060447bb29118c3127c63eb09290a8c02c43e85 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34556) Migrate EnumerableToLogicalTableScan
[ https://issues.apache.org/jira/browse/FLINK-34556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34556: --- Labels: pull-request-available (was: ) > Migrate EnumerableToLogicalTableScan > > > Key: FLINK-34556 > URL: https://issues.apache.org/jira/browse/FLINK-34556 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Affects Versions: 1.20.0 >Reporter: Jacky Lau >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-34556][table] Migrate EnumerableToLogicalTableScan to java. [flink]
liuyongvs opened a new pull request, #24421: URL: https://github.com/apache/flink/pull/24421 ## What is the purpose of the change The PR migrates EnumerableToLogicalTableScan to java it doesn't touch EnumerableToLogicalTableScanTest to be sure that java version continues passing it ## Verifying this change This change is already covered by existing tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-34556) Migrate EnumerableToLogicalTableScan
Jacky Lau created FLINK-34556: - Summary: Migrate EnumerableToLogicalTableScan Key: FLINK-34556 URL: https://issues.apache.org/jira/browse/FLINK-34556 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Affects Versions: 1.20.0 Reporter: Jacky Lau Fix For: 1.20.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34483][docs] Improve the documentation of 'state.checkpoints.dir' and 'state.checkpoint-storage' [flink]
fredia commented on code in PR #24401: URL: https://github.com/apache/flink/pull/24401#discussion_r1508457644 ## flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java: ## @@ -76,6 +76,39 @@ public class CheckpointingOptions { * CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader)} method is called. * * Recognized shortcut names are 'jobmanager' and 'filesystem'. + * + * {@link #CHECKPOINT_STORAGE} and {@link #CHECKPOINTS_DIRECTORY} are usually combined to + * configure the checkpoint location. The behaviors of different combinations are as follows: Review Comment: Thanks for the suggestion, I reorganized the description as you suggested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34522][core] Changing the Time to Duration for StateTtlConfig.Builder.cleanupInRocksdbCompactFilter [flink]
HuangXingBo commented on code in PR #24417: URL: https://github.com/apache/flink/pull/24417#discussion_r1508454782 ## flink-python/pyflink/datastream/state.py: ## @@ -809,7 +809,7 @@ def cleanup_incrementally(self, def cleanup_in_rocksdb_compact_filter( self, query_time_after_num_entries, -periodic_compaction_time=Time.days(30)) -> \ +periodic_compaction_time=Duration.of_days(30)) -> \ Review Comment: `Duration.of_days(30)` -> None ## flink-python/pyflink/datastream/state.py: ## @@ -925,14 +926,14 @@ class RocksdbCompactFilterCleanupStrategy(CleanupStrategy): def __init__(self, query_time_after_num_entries: int, - periodic_compaction_time=Time.days(30)): + periodic_compaction_time=Duration.of_days(30)): Review Comment: `Duration.of_days(30)-> None` and do the similar thing to `cleanup_in_rocksdb_compact_filter` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34529) Projection cannot be pushed down through rank operator.
[ https://issues.apache.org/jira/browse/FLINK-34529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822363#comment-17822363 ] yisha zhou commented on FLINK-34529: [~libenchao] Thanks for assigning the ticket to me. To the point 'I would prefer to putting these transposing rules all in "LOGICAL" stage', I agree with that. I'll add `ProjectWindowTransposeRule` in LOGICAL stage. With regard to other already-existing transpose rules in other stages, moving them into LOGICAL in this MR or in a independent MR, which do you think is better? For the question 'I'm even wondering that if we really needs {{{}CalcRankTransposeRule{}}}', I've tried to remove it, and found that `ProjectWindowTransposeRule` can completely cover the functionality of `{{{}CalcRankTransposeRule{}}}`(from the results of tests introduced along with this rule) and even do much better job in some cases. Therefore, I prepare to remove this rule in the this PR, WDT? > Projection cannot be pushed down through rank operator. > --- > > Key: FLINK-34529 > URL: https://issues.apache.org/jira/browse/FLINK-34529 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.19.0 >Reporter: yisha zhou >Assignee: yisha zhou >Priority: Major > > When there is a rank/deduplicate operator, the projection based on output of > this operator cannot be pushed down to the input of it. > The following code can help reproducing the issue: > {code:java} > val util = streamTestUtil() > util.addTableSource[(String, Int, String)]("T1", 'a, 'b, 'c) > util.addTableSource[(String, Int, String)]("T2", 'd, 'e, 'f) > val sql = > """ > |SELECT a FROM ( > | SELECT a, f, > | ROW_NUMBER() OVER (PARTITION BY f ORDER BY c DESC) as rank_num > | FROM T1, T2 > | WHERE T1.a = T2.d > |) > |WHERE rank_num = 1 > """.stripMargin > util.verifyPlan(sql){code} > The plan is expected to be: > {code:java} > Calc(select=[a]) > +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], > select=[a, c, f]) >+- Exchange(distribution=[hash[f]]) > +- Calc(select=[a, c, f]) > +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, c, d, f], > leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[hash[a]]) > : +- Calc(select=[a, c]) > : +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > +- Exchange(distribution=[hash[d]]) >+- Calc(select=[d, f]) > +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) > {code} > Notice that the 'select' of Join operator is [a, c, d, f]. However the actual > plan is: > {code:java} > Calc(select=[a]) > +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], > select=[a, c, f]) >+- Exchange(distribution=[hash[f]]) > +- Calc(select=[a, c, f]) > +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, > e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[hash[a]]) > : +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > +- Exchange(distribution=[hash[d]]) >+- LegacyTableSourceScan(table=[[default_catalog, > default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) > {code} > the 'select' of Join operator is [a, b, c, d, e, f], which means the > projection in the final Calc is not passed through the Rank. > And I think an easy way to fix this issue is to add > org.apache.calcite.rel.rules.ProjectWindowTransposeRule into > FlinkStreamRuleSets.LOGICAL_OPT_RULES. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34529) Projection cannot be pushed down through rank operator.
[ https://issues.apache.org/jira/browse/FLINK-34529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822362#comment-17822362 ] lincoln lee commented on FLINK-34529: - [~nilerzhou] Thanks for reporting this! As you folks discussed above, there indeed need a projection pushdown, reuse the calcite's corerules always be the first choice for flink(except for the special things in streaming for now). And for this case itself, can we just add `CoreRules.PROJECT_WINDOW_TRANSPOSE` into `FlinkStreamRuleSets#DEFAULT_REWRITE_RULES` (which will take effect during the `DEFAULT_REWRITE` rbo phase)?As in the current code, we tend to treat projection/predicate pushdown as a deterministic optimization. WDYT? > Projection cannot be pushed down through rank operator. > --- > > Key: FLINK-34529 > URL: https://issues.apache.org/jira/browse/FLINK-34529 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.19.0 >Reporter: yisha zhou >Assignee: yisha zhou >Priority: Major > > When there is a rank/deduplicate operator, the projection based on output of > this operator cannot be pushed down to the input of it. > The following code can help reproducing the issue: > {code:java} > val util = streamTestUtil() > util.addTableSource[(String, Int, String)]("T1", 'a, 'b, 'c) > util.addTableSource[(String, Int, String)]("T2", 'd, 'e, 'f) > val sql = > """ > |SELECT a FROM ( > | SELECT a, f, > | ROW_NUMBER() OVER (PARTITION BY f ORDER BY c DESC) as rank_num > | FROM T1, T2 > | WHERE T1.a = T2.d > |) > |WHERE rank_num = 1 > """.stripMargin > util.verifyPlan(sql){code} > The plan is expected to be: > {code:java} > Calc(select=[a]) > +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], > select=[a, c, f]) >+- Exchange(distribution=[hash[f]]) > +- Calc(select=[a, c, f]) > +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, c, d, f], > leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[hash[a]]) > : +- Calc(select=[a, c]) > : +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > +- Exchange(distribution=[hash[d]]) >+- Calc(select=[d, f]) > +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) > {code} > Notice that the 'select' of Join operator is [a, c, d, f]. However the actual > plan is: > {code:java} > Calc(select=[a]) > +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], > select=[a, c, f]) >+- Exchange(distribution=[hash[f]]) > +- Calc(select=[a, c, f]) > +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, > e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[hash[a]]) > : +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > +- Exchange(distribution=[hash[d]]) >+- LegacyTableSourceScan(table=[[default_catalog, > default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) > {code} > the 'select' of Join operator is [a, b, c, d, e, f], which means the > projection in the final Calc is not passed through the Rank. > And I think an easy way to fix this issue is to add > org.apache.calcite.rel.rules.ProjectWindowTransposeRule into > FlinkStreamRuleSets.LOGICAL_OPT_RULES. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34555) Migrate JoinConditionTypeCoerceRule
[ https://issues.apache.org/jira/browse/FLINK-34555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34555: --- Labels: pull-request-available (was: ) > Migrate JoinConditionTypeCoerceRule > --- > > Key: FLINK-34555 > URL: https://issues.apache.org/jira/browse/FLINK-34555 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Affects Versions: 1.20.0 >Reporter: Jacky Lau >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34555][table] Migrate JoinConditionTypeCoerceRule to java. [flink]
flinkbot commented on PR #24420: URL: https://github.com/apache/flink/pull/24420#issuecomment-1972439916 ## CI report: * 0183b6bb862ceaf1ef3ff96896ac5d673ba59493 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Join condition type coerce rule [flink]
liuyongvs opened a new pull request, #24420: URL: https://github.com/apache/flink/pull/24420 ## What is the purpose of the change The PR migrates JoinConditionTypeCoerceRule to java it doesn't touch JoinConditionTypeCoerceRuleTest to be sure that java version continues passing it ## Verifying this change This change is already covered by existing tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-34555) Migrate JoinConditionTypeCoerceRule
Jacky Lau created FLINK-34555: - Summary: Migrate JoinConditionTypeCoerceRule Key: FLINK-34555 URL: https://issues.apache.org/jira/browse/FLINK-34555 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Affects Versions: 1.20.0 Reporter: Jacky Lau Fix For: 1.20.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [BP-1.18][FLINK-34517][table]fix environment configs ignored when calling procedure operation [flink]
flinkbot commented on PR #24419: URL: https://github.com/apache/flink/pull/24419#issuecomment-1972391441 ## CI report: * b514f6bb29bd04049073b0762eacbce63d85e5ba UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34517][table]fix environment configs ignored when calling procedure operation [flink]
JustinLeesin commented on PR #24397: URL: https://github.com/apache/flink/pull/24397#issuecomment-1972389996 > @JustinLeesin Thanks for update. LGTM. I'll merge to master after 1.19 branch cut @luoyuxia OK, Thank you for taking the time to review my code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-16627) Support only generate non-null values when serializing into JSON
[ https://issues.apache.org/jira/browse/FLINK-16627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822358#comment-17822358 ] Benchao Li commented on FLINK-16627: [~nilerzhou] Thank you for the interest of contributing to Flink, I assigned this to you~ > Support only generate non-null values when serializing into JSON > > > Key: FLINK-16627 > URL: https://issues.apache.org/jira/browse/FLINK-16627 > Project: Flink > Issue Type: New Feature > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Planner >Affects Versions: 1.10.0 >Reporter: jackray wang >Assignee: yisha zhou >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > auto-unassigned, pull-request-available, sprint > > {code:java} > //sql > CREATE TABLE sink_kafka ( subtype STRING , svt STRING ) WITH (……) > {code} > > {code:java} > //sql > CREATE TABLE source_kafka ( subtype STRING , svt STRING ) WITH (……) > {code} > > {code:java} > //scala udf > class ScalaUpper extends ScalarFunction { > def eval(str: String) : String= { >if(str == null){ >return "" >}else{ >return str >} > } > > } > btenv.registerFunction("scala_upper", new ScalaUpper()) > {code} > > {code:java} > //sql > insert into sink_kafka select subtype, scala_upper(svt) from source_kafka > {code} > > > > Sometimes the svt's value is null, inert into kafkas json like > \{"subtype":"qin","svt":null} > If the amount of data is small, it is acceptable,but we process 10TB of data > every day, and there may be many nulls in the json, which affects the > efficiency. If you can add a parameter to remove the null key when defining a > sinktable, the performance will be greatly improved > > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-16627) Support only generate non-null values when serializing into JSON
[ https://issues.apache.org/jira/browse/FLINK-16627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Benchao Li reassigned FLINK-16627: -- Assignee: yisha zhou > Support only generate non-null values when serializing into JSON > > > Key: FLINK-16627 > URL: https://issues.apache.org/jira/browse/FLINK-16627 > Project: Flink > Issue Type: New Feature > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Planner >Affects Versions: 1.10.0 >Reporter: jackray wang >Assignee: yisha zhou >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > auto-unassigned, pull-request-available, sprint > > {code:java} > //sql > CREATE TABLE sink_kafka ( subtype STRING , svt STRING ) WITH (……) > {code} > > {code:java} > //sql > CREATE TABLE source_kafka ( subtype STRING , svt STRING ) WITH (……) > {code} > > {code:java} > //scala udf > class ScalaUpper extends ScalarFunction { > def eval(str: String) : String= { >if(str == null){ >return "" >}else{ >return str >} > } > > } > btenv.registerFunction("scala_upper", new ScalaUpper()) > {code} > > {code:java} > //sql > insert into sink_kafka select subtype, scala_upper(svt) from source_kafka > {code} > > > > Sometimes the svt's value is null, inert into kafkas json like > \{"subtype":"qin","svt":null} > If the amount of data is small, it is acceptable,but we process 10TB of data > every day, and there may be many nulls in the json, which affects the > efficiency. If you can add a parameter to remove the null key when defining a > sinktable, the performance will be greatly improved > > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34536) Support reading long value as Timestamp column in JSON format
[ https://issues.apache.org/jira/browse/FLINK-34536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822357#comment-17822357 ] Benchao Li commented on FLINK-34536: It sounds like a useful feature from the user perspective. Usually a numeric representation is a unixtime, and it would much like {{cast(numeric as timstamp[_ltz])}}, which has been discussed in [FLIP-162|https://cwiki.apache.org/confluence/display/FLINK/FLIP-162%3A+Consistent+Flink+SQL+time+function+behavior#FLIP162:ConsistentFlinkSQLtimefunctionbehavior-2.DisableCASTbetweenNUMERICandTIMESTAMP]. Hence it should be {{TIMSTAMP_LTZ}} instead of {{TIMESTAMP}} that is allowed to be converted from numeric. Besides, the precision of unixtime has multiple choices, second/millisecond/microsecond, so how would you suppose to distinguish them when converting it to {{TIMSTAMP_LTZ}}. For now, users can deal with the conversion in SQL expression via {{to_timestamp_ltz}} as an alternative, so I don't have a strong opinion whether to introduce a feature in format level. Anyway, I think this deserves a FLIP since JSON format is a very fundamental format and this would be a public API change. > Support reading long value as Timestamp column in JSON format > - > > Key: FLINK-34536 > URL: https://issues.apache.org/jira/browse/FLINK-34536 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.19.0 >Reporter: yisha zhou >Priority: Major > > In many scenarios, timestamp data is stored as Long value and expected to be > operated as Timestamp value. It's not user-friendly to use an UDF to convert > the data before operating it. > Meanwhile, in Avro format, it seems it can receive several types of input and > convert it into TimestampData. Hope the same ability can be introduced into > JSON format. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34517][table]fix environment configs ignored when calling procedure operation [flink]
JustinLeesin commented on PR #24397: URL: https://github.com/apache/flink/pull/24397#issuecomment-1972385924 > @JustinLeesin Could you please cherry pick it to release-1.18. I'll merge to 1.18 firstly. @luoyuxia Done , [24414](https://github.com/apache/flink/pull/24419) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [BP-1.18][FLINK-34517][table]fix environment configs ignored when calling procedure operation [flink]
JustinLeesin opened a new pull request, #24419: URL: https://github.com/apache/flink/pull/24419 1.18 backport for parent PR [24397](https://github.com/apache/flink/pull/24397) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-34521) Using the Duration instead of the deprecated Time classes
[ https://issues.apache.org/jira/browse/FLINK-34521?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan closed FLINK-34521. --- Resolution: Duplicate > Using the Duration instead of the deprecated Time classes > - > > Key: FLINK-34521 > URL: https://issues.apache.org/jira/browse/FLINK-34521 > Project: Flink > Issue Type: Technical Debt >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Fix For: 1.20.0 > > > FLINK-32570 deprecated org.apache.flink.api.common.time.Time and > org.apache.flink.streaming.api.windowing.time.Time. > We should refactor all internal callers from Time to Duration. (Public > callers should be removed in 2.0). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34521) Using the Duration instead of the deprecated Time classes
[ https://issues.apache.org/jira/browse/FLINK-34521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822350#comment-17822350 ] Rui Fan commented on FLINK-34521: - Thanks for the reminder, i close this jira. > Using the Duration instead of the deprecated Time classes > - > > Key: FLINK-34521 > URL: https://issues.apache.org/jira/browse/FLINK-34521 > Project: Flink > Issue Type: Technical Debt >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Fix For: 1.20.0 > > > FLINK-32570 deprecated org.apache.flink.api.common.time.Time and > org.apache.flink.streaming.api.windowing.time.Time. > We should refactor all internal callers from Time to Duration. (Public > callers should be removed in 2.0). -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-32076][checkpoint] Introduce file pool to reuse files [flink]
flinkbot commented on PR #24418: URL: https://github.com/apache/flink/pull/24418#issuecomment-1972349443 ## CI report: * 61e97462d4566a1ae8f17ceb32e42f68f92631a6 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32076) Add file pool for concurrent file reusing
[ https://issues.apache.org/jira/browse/FLINK-32076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32076: --- Labels: pull-request-available (was: ) > Add file pool for concurrent file reusing > - > > Key: FLINK-32076 > URL: https://issues.apache.org/jira/browse/FLINK-32076 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing, Runtime / State Backends >Affects Versions: 1.18.0 >Reporter: Zakelly Lan >Assignee: Hangxiang Yu >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-32076][checkpoint] Introduce file pool to reuse files [flink]
masteryhx opened a new pull request, #24418: URL: https://github.com/apache/flink/pull/24418 ## What is the purpose of the change Add file pool for concurrent file reusing ## Brief change log - Introduce PhysicalFilePool and use it in FileMergingSnapshotManager - Support different type of PhysicalFilePool ## Verifying this change This change added tests and can be verified as follows: - *Added testConcurrentFileReusingWithNonBlockingPool and testConcurrentFileReusingWithBlockingPool* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), **Checkpointing**, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33989) Insert Statement With Filter Operation Generates Extra Tombstone using Upsert Kafka Connector
[ https://issues.apache.org/jira/browse/FLINK-33989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822337#comment-17822337 ] xuyang commented on FLINK-33989: Agree with [~libenchao] . This is a behavior by design. > Insert Statement With Filter Operation Generates Extra Tombstone using Upsert > Kafka Connector > - > > Key: FLINK-33989 > URL: https://issues.apache.org/jira/browse/FLINK-33989 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka, Table SQL / Runtime >Affects Versions: 1.17.2 >Reporter: Flaviu Cicio >Priority: Major > > Given the following Flink SQL tables: > {code:sql} > CREATE TABLE input ( > id STRING NOT NULL, > current_value STRING NOT NULL, > PRIMARY KEY (id) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'input', > 'key.format' = 'raw', > 'properties.bootstrap.servers' = 'kafka:29092', > 'properties.group.id' = 'your_group_id', > 'value.format' = 'json' > ); > CREATE TABLE output ( > id STRING NOT NULL, > current_value STRING NOT NULL, > PRIMARY KEY (id) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'output', > 'key.format' = 'raw', > 'properties.bootstrap.servers' = 'kafka:29092', > 'properties.group.id' = 'your_group_id', > 'value.format' = 'json' > ); {code} > And, the following entries are present in the input Kafka topic: > {code:json} > [ > { > "id": "1", > "current_value": "abc" > }, > { > "id": "1", > "current_value": "abcd" > } > ]{code} > If we execute the following statement: > {code:sql} > INSERT INTO output SELECT id, current_value FROM input; {code} > The following entries are published to the output Kafka topic: > {code:json} > [ > { > "id": "1", > "current_value": "abc" > }, > { > "id": "1", > "current_value": "abcd" > } > ]{code} > But, if we execute the following statement: > {code:sql} > INSERT INTO output SELECT id, current_value FROM input WHERE id IN ('1'); > {code} > The following entries are published: > {code:json} > [ > { > "id": "1", > "current_value": "abc" > }, > null, > { > "id": "1", > "current_value": "abcd" > } > ]{code} > We would expect the result to be the same for both insert statements. > As we can see, there is an extra tombstone generated as a result of the > second statement. > > Moreover, if we make a select on the input table: > {code:sql} > SELECT * FROM input; > {code} > We will get the following entries: > ||op||id||current_value|| > |I|1|abc| > |-U|1|abc| > |+U|1|abcd| > We expected to see only the insert and the update_after entries. > The update_before is added at DeduplicateFunctionHelper#122. > This is easily reproducible with this test that we added in the > UpsertKafkaTableITCase from flink-connector-kafka: > {code:java} > @Test > public void testAggregateFilterOmit() throws Exception { > String topic = COUNT_FILTER_TOPIC + "_" + format; > createTestTopic(topic, 1, 1); > env.setParallelism(1); > // - test --- > countFilterToUpsertKafkaOmitUpdateBefore(topic); > // - clean up --- > deleteTestTopic(topic); > } > private void countFilterToUpsertKafkaOmitUpdateBefore(String table) > throws Exception { > String bootstraps = getBootstrapServers(); > List data = > Arrays.asList( > Row.of(1, "Hi"), > Row.of(1, "Hello"), > Row.of(2, "Hello world"), > Row.of(2, "Hello world, how are you?"), > Row.of(2, "I am fine."), > Row.of(3, "Luke Skywalker"), > Row.of(3, "Comment#1"), > Row.of(3, "Comment#2"), > Row.of(4, "Comment#3"), > Row.of(4, null)); > final String createSource = > String.format( > "CREATE TABLE aggfilter_%s (" > + " `id` INT,\n" > + " `comment` STRING\n" > + ") WITH (" > + " 'connector' = 'values'," > + " 'data-id' = '%s'" > + ")", > format, TestValuesTableFactory.registerData(data)); > tEnv.executeSql(createSource); > final String createSinkTable = > String.format( > "CREATE TABLE %s (\n" > + " `id` INT,\n" > + " `comment`
Re: [PR] [FLINK-34522][core] Changing the Time to Duration for StateTtlConfig.Builder.cleanupInRocksdbCompactFilter [flink]
flinkbot commented on PR #24417: URL: https://github.com/apache/flink/pull/24417#issuecomment-1972320798 ## CI report: * c116a9c297479ebddb59864bda83ec8dfd43bb4b UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-34522][core] Changing the Time to Duration for StateTtlConfig.Builder.cleanupInRocksdbCompactFilter [flink]
RocMarshal opened a new pull request, #24417: URL: https://github.com/apache/flink/pull/24417 - What is the purpose of the change [FLINK-32570](https://issues.apache.org/jira/browse/FLINK-32570) deprecated the Time class and refactor all Public or PublicEvolving apis to use the Java's Duration. StateTtlConfig.Builder#cleanupInRocksdbCompactFilter is still using the Time class. In general, we expect: Mark cleanupInRocksdbCompactFilter(long, Time) as @Deprecated Provide a new cleanupInRocksdbCompactFilter(long, Duration) But I found this method is introduced in 1.19 ([FLINK-30854](https://issues.apache.org/jira/browse/FLINK-30854)), so a better solution may be: only provide cleanupInRocksdbCompactFilter(long, Duration) and don't use Time. The deprecated Api should be keep for 2 minor version. IIUC, we cannot remove Time related class in Flink 2.0 if we don't deprecate it in 1.19. If so, I think it's better to merge this JIRA in 1.19.0 as well. - Brief change log [[FLINK-34522](https://issues.apache.org/jira/browse/FLINK-34522)][core] Changing the Time to Duration for StateTtlConfig.Builder.cleanupInRocksdbCompactFilter Fix based on https://github.com/apache/flink/pull/24388 & https://github.com/apache/flink/pull/24387#issuecomment-1971427877 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34550) attempted task still report metric of currentEmitEventTimeLag
[ https://issues.apache.org/jira/browse/FLINK-34550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] siwei.gao updated FLINK-34550: -- Attachment: image-2024-03-01-09-51-08-909.png > attempted task still report metric of currentEmitEventTimeLag > - > > Key: FLINK-34550 > URL: https://issues.apache.org/jira/browse/FLINK-34550 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.1 > Environment: flink version:1.17.1 > kafka-connector:1.17.1 >Reporter: siwei.gao >Priority: Major > Labels: streamsource > Attachments: image-2024-02-29-21-41-01-709.png, > image-2024-02-29-21-43-18-340.png, image-2024-02-29-21-50-55-160.png, > image-2024-03-01-09-51-08-909.png > > > Attempted task still report metric of currentEmitEventTimeLag when use > kafka-connector.Attempt_num for reporting indicators of multiple tasks with > the same subtask_index but different task_attempt_num times within the same > time period. !image-2024-02-29-21-43-18-340.png|width=990,height=237! > Only the metric which tash_attempt_num is 4 should be reported normally. > This condition shows in taskmanager with multiple slots and it's ok when > taskmanager only has one slot. > !image-2024-02-29-21-50-55-160.png|width=973,height=730! > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34550) attempted task still report metric of currentEmitEventTimeLag
[ https://issues.apache.org/jira/browse/FLINK-34550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] siwei.gao updated FLINK-34550: -- Description: Attempted task still report metric of currentEmitEventTimeLag when use kafka-connector.Attempt_num for reporting indicators of multiple tasks with the same subtask_index but different task_attempt_num times within the same time period. !image-2024-03-01-09-51-08-909.png|width=992,height=242! Only the metric which tash_attempt_num is 4 should be reported normally. This condition shows in taskmanager with multiple slots and it's ok when taskmanager only has one slot. !image-2024-02-29-21-50-55-160.png|width=973,height=730! was: Attempted task still report metric of currentEmitEventTimeLag when use kafka-connector.Attempt_num for reporting indicators of multiple tasks with the same subtask_index but different task_attempt_num times within the same time period. !image-2024-02-29-21-43-18-340.png|width=990,height=237! Only the metric which tash_attempt_num is 4 should be reported normally. This condition shows in taskmanager with multiple slots and it's ok when taskmanager only has one slot. !image-2024-02-29-21-50-55-160.png|width=973,height=730! > attempted task still report metric of currentEmitEventTimeLag > - > > Key: FLINK-34550 > URL: https://issues.apache.org/jira/browse/FLINK-34550 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.1 > Environment: flink version:1.17.1 > kafka-connector:1.17.1 >Reporter: siwei.gao >Priority: Major > Labels: streamsource > Attachments: image-2024-02-29-21-41-01-709.png, > image-2024-02-29-21-43-18-340.png, image-2024-02-29-21-50-55-160.png, > image-2024-03-01-09-51-08-909.png > > > Attempted task still report metric of currentEmitEventTimeLag when use > kafka-connector.Attempt_num for reporting indicators of multiple tasks with > the same subtask_index but different task_attempt_num times within the same > time period. !image-2024-03-01-09-51-08-909.png|width=992,height=242! > Only the metric which tash_attempt_num is 4 should be reported normally. > This condition shows in taskmanager with multiple slots and it's ok when > taskmanager only has one slot. > !image-2024-02-29-21-50-55-160.png|width=973,height=730! > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34550) attempted task still report metric of currentEmitEventTimeLag
[ https://issues.apache.org/jira/browse/FLINK-34550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] siwei.gao updated FLINK-34550: -- Description: Attempted task still report metric of currentEmitEventTimeLag when use kafka-connector.Attempt_num for reporting indicators of multiple tasks with the same subtask_index but different task_attempt_num times within the same time period. !image-2024-02-29-21-43-18-340.png|width=990,height=237! Only the metric which tash_attempt_num is 4 should be reported normally. This condition shows in taskmanager with multiple slots and it's ok when taskmanager only has one slot. !image-2024-02-29-21-50-55-160.png|width=973,height=730! was: Attempted task still report metric of currentEmitEventTimeLag when use kafka-connector.Attempt_num for reporting indicators of multiple tasks with the same subtask_index but different task_attempt_num times within the same time period. !image-2024-02-29-21-43-18-340.png! Only the metric which tash_attempt_num is 4 should be reported normally. This condition shows in taskmanager with multiple slots and it's ok when taskmanager only has one slot. !image-2024-02-29-21-50-55-160.png! > attempted task still report metric of currentEmitEventTimeLag > - > > Key: FLINK-34550 > URL: https://issues.apache.org/jira/browse/FLINK-34550 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.1 > Environment: flink version:1.17.1 > kafka-connector:1.17.1 >Reporter: siwei.gao >Priority: Major > Labels: streamsource > Attachments: image-2024-02-29-21-41-01-709.png, > image-2024-02-29-21-43-18-340.png, image-2024-02-29-21-50-55-160.png > > > Attempted task still report metric of currentEmitEventTimeLag when use > kafka-connector.Attempt_num for reporting indicators of multiple tasks with > the same subtask_index but different task_attempt_num times within the same > time period. !image-2024-02-29-21-43-18-340.png|width=990,height=237! > Only the metric which tash_attempt_num is 4 should be reported normally. > This condition shows in taskmanager with multiple slots and it's ok when > taskmanager only has one slot. > !image-2024-02-29-21-50-55-160.png|width=973,height=730! > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34553) Time travel support by Flink catalogs
[ https://issues.apache.org/jira/browse/FLINK-34553?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822332#comment-17822332 ] Feng Jin commented on FLINK-34553: -- >From what I currently know,only Paimon supports Flink's timetravel. For more >details, please refer to the documentation: https://paimon.apache.org/docs/master/how-to/querying-tables/#batch-time-travel And the source code: https://github.com/apache/incubator-paimon/blob/master/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java > Time travel support by Flink catalogs > - > > Key: FLINK-34553 > URL: https://issues.apache.org/jira/browse/FLINK-34553 > Project: Flink > Issue Type: Technical Debt >Reporter: Mehmet Aktas >Priority: Major > > I am trying to add time travel support for the Flink backend in > [Ibis|https://github.com/ibis-project/ibis]. > I found that Flink requires the {{catalog}} to implement > {{getTable(ObjectPath tablePath, long timestamp)}} for time travel support: > Attention: Currently, time travel requires the corresponding catalog that the > table belongs to implementing the getTable(ObjectPath tablePath, long > timestamp) method. See more details in Catalog. > [[https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/time-travel/]] > The default {{GenericInMemoryCatalog}} does not seem to implement > {{getTable()}} . I set up a {{hive metastore}} and created a {{{}hive > catalog{}}}, but it turns out that, hive catalog also does not implement > {{getTable()}} -- I wish Flink docs were more detailed about these ... > py4j.protocol.Py4JJavaError: An error occurred while calling o8.sqlQuery. : > org.apache.flink.table.api.ValidationException: SQL validation failed. > getTable(ObjectPath, long) is not implemented for class > org.apache.flink.table.catalog.hive.HiveCatalog. > I have two options now to continue with this: * Try another catalog, like > Iceberg catalog, to see if that one implements {{getTable()}} > ** {{{}{}}}I am not able to find information on whether a given catalog > implements this function. Should I dig into their source code to figure this > out, or is there any other way to find out? > * Implement a custom catalog with {{{}getTable(){}}}, following the > *very-brief-instructions* given in the [Flink > doc|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#interface-in-catalog-for-supporting-time-travel]. > Is there a doc, article, mailing list or anything else that I can use to get > more information on > * Which catalogs implement time travel support? > * If there is not readily available catalog supporting time travel, then how > can we implement a custom catalog with time travel support? > ** The instructions given > [here|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#interface-in-catalog-for-supporting-time-travel] > are unfortunately not sufficient for a Java-illiterate person like myself. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34517][table]fix environment configs ignored when calling procedure operation [flink]
luoyuxia commented on PR #24397: URL: https://github.com/apache/flink/pull/24397#issuecomment-1972271813 @JustinLeesin Could you please cherry pick it to release-1.18. I'll merge to 1.18 firstly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34542][docs] improved example quickstart build.gradle [flink]
flinkbot commented on PR #24416: URL: https://github.com/apache/flink/pull/24416#issuecomment-1972195868 ## CI report: * 0d64603764567494fefd05b5847a60795af949fd UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34542) Improve Gradle Quick Start build.gradle with Better Gradle API Alternatives
[ https://issues.apache.org/jira/browse/FLINK-34542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lennon Yu updated FLINK-34542: -- Description: This is a ticket of misc. improvements on the build.gradle script provided at {{Application Development}} >> {{Project Configuration}} >> {{Overview}} >> {{Getting Started:}} * Add {{mergeServiceFiles()}} call to the {{shadowJar}} configuration block ** Absence of this will cause class-not-found errors in SPI related class loading if the user has multiple connectors/formats in their implementation. * Move the top level {{mainClassName}} project property setting into application \{ mainClass = 'foo.Bar' } ** This is because the top-level mainClassName property will be deprecated in Gradle 9.0+ * Replace the use of {{sourceCompatibility}} and {{targetCompatibility}} properties with java \{ toolChain \{ languageVersion = JavaLanguageVersion.of(11) \} \} ** This is the recommended way by Gradle to streamline langauge version configuration. ** Also the original configured Java version - Java 8 - is getting close to its terminal support phase, and it's better to move on to Java 11. was: This is a ticket of misc. improvements on the build.gradle script provided at {{Application Development}} >> {{Project Configuration}} >> {{Overview}} >> {{Getting Started:}} * Add {{mergeServiceFiles()}} call to the {{shadowJar}} configuration block ** Absence of this will cause class-not-found errors in SPI related class loading if the user has multiple connectors/formats in their implementation. * Move the top level {{mainClassName}} project property setting into application \{ mainClass = 'foo.Bar' } ** This is because the top-level mainClassName property will be deprecated in Gradle 9.0+ * Replace the use of {{sourceCompatibility}} and {{targetCompatibility}} properties with java \{ toolChain { languageVersion = JavaLanguageVersion.of(11) } } ** This is the recommended way by Gradle to streamline langauge version configuration. ** Also the original configured Java version - Java 8 - is getting close to its terminal support phase, and it's better to move on to Java 11. > Improve Gradle Quick Start build.gradle with Better Gradle API Alternatives > --- > > Key: FLINK-34542 > URL: https://issues.apache.org/jira/browse/FLINK-34542 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Lennon Yu >Priority: Minor > Labels: pull-request-available > > This is a ticket of misc. improvements on the build.gradle script provided at > {{Application Development}} >> {{Project Configuration}} >> {{Overview}} >> > {{Getting Started:}} > * Add {{mergeServiceFiles()}} call to the {{shadowJar}} configuration block > ** Absence of this will cause class-not-found errors in SPI related class > loading if the user has multiple connectors/formats in their implementation. > * Move the top level {{mainClassName}} project property setting into > application \{ mainClass = 'foo.Bar' } > ** This is because the top-level mainClassName property will be deprecated > in Gradle 9.0+ > * Replace the use of {{sourceCompatibility}} and {{targetCompatibility}} > properties with java \{ toolChain \{ languageVersion = > JavaLanguageVersion.of(11) \} \} > ** This is the recommended way by Gradle to streamline langauge version > configuration. > ** Also the original configured Java version - Java 8 - is getting close to > its terminal support phase, and it's better to move on to Java 11. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34542) Improve Gradle Quick Start build.gradle with Better Gradle API Alternatives
[ https://issues.apache.org/jira/browse/FLINK-34542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lennon Yu updated FLINK-34542: -- Description: This is a ticket of misc. improvements on the build.gradle script provided at {{Application Development}} >> {{Project Configuration}} >> {{Overview}} >> {{Getting Started:}} * Add {{mergeServiceFiles()}} call to the {{shadowJar}} configuration block ** Absence of this will cause class-not-found errors in SPI related class loading if the user has multiple connectors/formats in their implementation. * Move the top level {{mainClassName}} project property setting into application \{ mainClass = 'foo.Bar' } ** This is because the top-level mainClassName property will be deprecated in Gradle 9.0+ * Replace the use of {{sourceCompatibility}} and {{targetCompatibility}} properties with java \{ toolChain { languageVersion = JavaLanguageVersion.of(11) } } ** This is the recommended way by Gradle to streamline langauge version configuration. ** Also the original configured Java version - Java 8 - is getting close to its terminal support phase, and it's better to move on to Java 11. was: This is a ticket of misc. improvements on the build.gradle script provided at {{Application Development}} >> {{Project Configuration}} >> {{Overview}} >> {{Getting Started:}} * Add {{mergeServiceFiles()}} call to the {{shadowJar}} configuration block ** Absence of this will cause class-not-found errors in SPI related class loading if the user has multiple connectors/formats in their implementation. * Move the top level {{mainClassName}} project property setting into application \{ mainClass = 'foo.Bar' } ** This is because the top-level mainClassName property will be deprecated in Gradle 9.0+ * Replace the use of {{sourceCompatibility}} and {{targetCompatibility}} properties with java \{ toolChain \{ languageVersion = JavaLanguageVersion.of(17) } } ** This is the recommended way by Gradle to streamline langauge version configuration. > Improve Gradle Quick Start build.gradle with Better Gradle API Alternatives > --- > > Key: FLINK-34542 > URL: https://issues.apache.org/jira/browse/FLINK-34542 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Lennon Yu >Priority: Minor > Labels: pull-request-available > > This is a ticket of misc. improvements on the build.gradle script provided at > {{Application Development}} >> {{Project Configuration}} >> {{Overview}} >> > {{Getting Started:}} > * Add {{mergeServiceFiles()}} call to the {{shadowJar}} configuration block > ** Absence of this will cause class-not-found errors in SPI related class > loading if the user has multiple connectors/formats in their implementation. > * Move the top level {{mainClassName}} project property setting into > application \{ mainClass = 'foo.Bar' } > ** This is because the top-level mainClassName property will be deprecated > in Gradle 9.0+ > * Replace the use of {{sourceCompatibility}} and {{targetCompatibility}} > properties with java \{ toolChain { languageVersion = > JavaLanguageVersion.of(11) } } > ** This is the recommended way by Gradle to streamline langauge version > configuration. > ** Also the original configured Java version - Java 8 - is getting close to > its terminal support phase, and it's better to move on to Java 11. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34542) Improve Gradle Quick Start build.gradle with Better Gradle API Alternatives
[ https://issues.apache.org/jira/browse/FLINK-34542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34542: --- Labels: pull-request-available (was: ) > Improve Gradle Quick Start build.gradle with Better Gradle API Alternatives > --- > > Key: FLINK-34542 > URL: https://issues.apache.org/jira/browse/FLINK-34542 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Lennon Yu >Priority: Minor > Labels: pull-request-available > > This is a ticket of misc. improvements on the build.gradle script provided at > {{Application Development}} >> {{Project Configuration}} >> {{Overview}} >> > {{Getting Started:}} > * Add {{mergeServiceFiles()}} call to the {{shadowJar}} configuration block > ** Absence of this will cause class-not-found errors in SPI related class > loading if the user has multiple connectors/formats in their implementation. > * Move the top level {{mainClassName}} project property setting into > application \{ mainClass = 'foo.Bar' } > ** This is because the top-level mainClassName property will be deprecated > in Gradle 9.0+ > * Replace the use of {{sourceCompatibility}} and {{targetCompatibility}} > properties with java \{ toolChain \{ languageVersion = > JavaLanguageVersion.of(17) } } > ** This is the recommended way by Gradle to streamline langauge version > configuration. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-34542][docs] improved example quickstart build.gradle [flink]
LemonU opened a new pull request, #24416: URL: https://github.com/apache/flink/pull/24416 ## What is the purpose of the change To improve the `build.gradle` build script provided in the Flink documentation under section `Application Development >> Project Configuration >> Overview >> Getting Started`. ## Brief change log - Replaced some project setup calls that are about to be deprecated in newer Gradle versions. - Enabled service file merging in the `shadowJar ` extension to allow having multiple connector/format jars on the project classpath. ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34535] Support JobPlanInfo for the explain result [flink]
jeyhunkarimov commented on code in PR #24411: URL: https://github.com/apache/flink/pull/24411#discussion_r1508294826 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala: ## @@ -1824,6 +1826,11 @@ object TableTestUtil { s.replaceAll("\"id\"\\s*:\\s*\\d+", "\"id\" : ").trim } + /** JobJson {jid} is ignored. */ + def replaceJobId(s: String): String = { +s.replaceAll("\"jid\"\\s*:\\s*\"[\\w.-]*\"", "\"jid\":\"\"").trim Review Comment: This might also replace a string literal having `"jid..."` pattern. We should make sure that only field values are targeted in this case ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala: ## @@ -1093,6 +1093,8 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean) case ExplainDetail.ESTIMATED_COST => replaceEstimatedCost(result) case ExplainDetail.JSON_EXECUTION_PLAN => replaceNodeIdInOperator(replaceStreamNodeId(replaceStageId(result))) +case ExplainDetail.JSON_JOB_PLAN => Review Comment: I would leave the old test as it is, and add your cases in another test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34491) Move from experimental support to production support for Java 17
[ https://issues.apache.org/jira/browse/FLINK-34491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dhruv Patel updated FLINK-34491: Description: This task is to move away from experimental support for Java 17 to production support so that teams running Flink in production can migrate to Java 17 successfully *Background:* Flink supports protobuf dataformat to exchange messages between different operators and the serialization and deserialization of these protobufs messages are performed by library called "Kryo". In order to move away from experimental support of Java 17 released as part of Flink 1.18.1, the Kryo library in Flink 1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 does not support Java 17. This improvement plan is tracked as part of this ticket [https://cwiki.apache.org/confluence/display/FLINK/FLIP-317%3A+Upgrade+Kryo+from+2.24.0+to+5.5.0] All Flink applications using protobuf currently generate state with Kryo v2. Once the above improvement plan is complete all Flink applications will fully support reading that state and write newer state with Kryo v5. However, latest Kryo v5 doesn't support snapshots made by a previous Kryo v2. This will prevent applications which are using snapshot mechanism to deploy their jobs to latest Flink version with Kryo v5 support without a bridge version running on Java 11. Applications will have to run on a bridge release version that will read their state with Kryo v2 data and write it with Kryo v5 data before upgrading to a future version of Flink that completely drops support for Kryo v2. Basically, Flink applications using protobuf dataformat cannot move directly from Java 8 to Java 17 without downtime after the kryo v5 release in Flink. Applications will need to first move to Java 11 (bridging version) and then move to Java 17 to have a safe deployment. *Blocker for this task:* Upgrade to Kryo 5.5.0 which supports Java 17 and a path for snapshot migration https://issues.apache.org/jira/browse/FLINK-3154. was: This task is to move away from experimental support for Java 17 to production support so that teams running Flink in production can migrate to Java 17 successfully *Background:* Flink supports protobuf dataformat to exchange messages between different operators and the serialization and deserialization of those protobufs are performed by library called "Kryo". In order to move away from experimental support of Java 17 released as part of Flink 1.18.1, the Kryo library in Flink 1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 does not support Java 17. This improvement plan is tracked as part of this ticket [https://cwiki.apache.org/confluence/display/FLINK/FLIP-317%3A+Upgrade+Kryo+from+2.24.0+to+5.5.0] All Flink applications using protobuf currently generate state with Kryo v2. Once the above improvement plan is complete all Flink applications will fully support reading that state and write newer state with Kryo v5. However, latest Kryo v5 doesn't support snapshots made by a previous Kryo v2. This will prevent applications which are using snapshot mechanism to deploy their jobs to latest Flink version with Kryo v5 support without a bridge version running on Java 11. Applications will have to run on a bridge release version that will read their state with Kryo v2 data and write it with Kryo v5 data before upgrading to a future version of Flink that completely drops support for Kryo v2. Basically, Flink applications using protobuf dataformat cannot move directly from Java 8 to Java 17 without downtime after the kryo v5 release in Flink. Applications will need to first move to Java 11 (bridging version) and then move to Java 17 to have a safe deployment. *Blocker for this task:* Upgrade to Kryo 5.5.0 which supports Java 17 and a path for snapshot migration https://issues.apache.org/jira/browse/FLINK-3154. > Move from experimental support to production support for Java 17 > > > Key: FLINK-34491 > URL: https://issues.apache.org/jira/browse/FLINK-34491 > Project: Flink > Issue Type: New Feature >Affects Versions: 1.18.1 >Reporter: Dhruv Patel >Priority: Major > > This task is to move away from experimental support for Java 17 to production > support so that teams running Flink in production can migrate to Java 17 > successfully > *Background:* > Flink supports protobuf dataformat to exchange messages between different > operators and the serialization and deserialization of these protobufs > messages are performed by library called "Kryo". In order to move away from > experimental support of Java 17 released as part of Flink 1.18.1, the Kryo > library in Flink 1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo > 2.24.0 does not support Java
[jira] [Commented] (FLINK-29370) Protobuf in flink-sql-protobuf is not shaded
[ https://issues.apache.org/jira/browse/FLINK-29370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822311#comment-17822311 ] Jeyhun Karimov commented on FLINK-29370: Hi [~tanjialiang] you might need to consider [this comment|https://github.com/apache/flink/pull/14376#issuecomment-1164395312] before relocating protobuf > Protobuf in flink-sql-protobuf is not shaded > > > Key: FLINK-29370 > URL: https://issues.apache.org/jira/browse/FLINK-29370 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.16.0 >Reporter: Jark Wu >Priority: Major > > The protobuf classes in flink-sql-protobuf is not shaded which may lead to > class conflicts. Usually, sql jars should shade common used dependencies, > e.g. flink-sql-avro: > https://github.com/apache/flink/blob/master/flink-formats/flink-sql-avro/pom.xml#L88 > > {code} > ➜ Downloads jar tvf flink-sql-protobuf-1.16.0.jar | grep com.google > 0 Tue Sep 13 20:23:44 CST 2022 com/google/ > 0 Tue Sep 13 20:23:44 CST 2022 com/google/protobuf/ >568 Tue Sep 13 20:23:44 CST 2022 > com/google/protobuf/ProtobufInternalUtils.class > 19218 Tue Sep 13 20:23:44 CST 2022 > com/google/protobuf/AbstractMessage$Builder.class >259 Tue Sep 13 20:23:44 CST 2022 > com/google/protobuf/AbstractMessage$BuilderParent.class > 10167 Tue Sep 13 20:23:44 CST 2022 com/google/protobuf/AbstractMessage.class > 1486 Tue Sep 13 20:23:44 CST 2022 > com/google/protobuf/AbstractMessageLite$Builder$LimitedInputStream.class > 12399 Tue Sep 13 20:23:44 CST 2022 > com/google/protobuf/AbstractMessageLite$Builder.class >279 Tue Sep 13 20:23:44 CST 2022 > com/google/protobuf/AbstractMessageLite$InternalOneOfEnu > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34554) Using EXACTLY_ONCE with KafkaSink cause broker's OOM due to newly created transactionalId per checkpoint
[ https://issues.apache.org/jira/browse/FLINK-34554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hilmi Al Fatih updated FLINK-34554: --- Description: Flink version: 1.17.1 Kafka Broker version: 2.7.1 * 4 GB heap memory for each Hi, We recently had an outage in our production system after we perform a Flink kafka-connector API upgrade. To give a brief context, our application is a simple kafka-to-kafka pipeline with minimal processing. We run in EXACTLY_ONCE mode, thus kafka transaction is involved. Our application runs with total around 350 sink subtask. Checkpoint period was set to 5 seconds to avoid blocking {{read_committed}} consumers too long. We recently performed an upgrade with the following details: Previous state: * Flink version: 1.14.4 * Broker version: 2.7.1 * kafka connector API: FlinkKafkaProducer Update to: * Flink version: 1.17.1 * Broker version: 2.7.1 * kafka connector API: KafkaSink Around 10 hours after the deployment, our kafka broker started to failing with OOM error. Heap dump entries are dominated by the ProducerStateEntry records. Our investigation leads to finding the total implementation change between FlinkKafkaProducer and KafkaSink. * KafkaSink generate different transactionalId for each checkpoint, * FlinkKafkaProducer uses constant set of transactionalId pool. With this behavior, KafkaSink seemed to exhaust our broker heap very fast and the ProducerStateEntry will only expire after [transactional.id.expiration.ms|http://transactional.id.expiration.ms/] , which by default is set to 7 days. ([ref1|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/Log.scala#L677], [ref2|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/LogManager.scala#L268], [ref3|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/LogManager.scala#L1207]) For our job, it means it creates roughly: * 10 hour running) 350 ids * 12 times/minute * 60 min/hour * 10 hour ~ 2,520,000 * 7 days) ~ 42mil entries. Attached below is the number of ProducerStateEntry entries of heap dump when it is OOM: * 505,000 (6.5%), in total it would be roughly ~ 7,000,000 entries. There are several things that come up in our mind to mitigate the drawbacks such as: * reduce the number of subtasks, so it reduces the number of transactionalId * Enlarge the checkpoint period to reduce the newly generated transactionalId rate. * Shorten [transactional.id.expiration.ms|http://transactional.id.expiration.ms/] to expire the unused transactionalId soon. * Increase the broker heap However, above mitigation might be too cumbersome and need careful tuning which harm our flexibility.In addition, due to the lack of maintaining lingering transaction state, TransactionAborter seems to abort old transaction naively. We might be accidentally (or purposefully) reuse the same transactionalIdPrefix and start the counter from 0. In that case, if the old transactionalId happens to have epoch >0, it will keep looping aborting the nonexistent transactions up to the latest checkpoint counter (which may be too big) and make the job stuck. Btw, I am aware that in Flink 2.0, you guys are putting a lot of effort on creating better integration with Kafka transaction ([FLIP-319|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710]). In FLIP-319, it mentions something about TID pooling. However, it is seem that there is no relevant page yet for it, so I wonder whether there are any concrete plan already that I can follow, or if there is something I can contribute to, I will be really happy to help. was: Flink version: 1.17.1 Kafka Broker version: 2.7.1 * 4 GB heap memory for each Hi, We recently had an outage in our production system after we perform a Flink kafka-connector API upgrade. To give a brief context, our application is a simple kafka-to-kafka pipeline with minimal processing. We run in EXACTLY_ONCE mode, thus kafka transaction is involved. Our application runs with total around 350 sink subtask. Checkpoint period was set to 5 seconds to avoid blocking {{read_committed}} consumers too long. We recently performed an upgrade with the following details: Previous state: * Flink version: 1.14.4 * Broker version: 2.7.1 * kafka connector API: FlinkKafkaProducer Update to: * Flink version: 1.17.1 * Broker version: 2.7.1 * kafka connector API: KafkaSink Around 10 hours after the deployment, our kafka broker started to failing with OOM error. Heap dump entries are dominated by the ProducerStateEntry records. Our investigation leads to finding the total implementation change between FlinkKafkaProducer and KafkaSink. * KafkaSink generate different transactionalId for each checkpoint, * FlinkKafkaProducer uses constant set
[jira] [Updated] (FLINK-34554) Using EXACTLY_ONCE with KafkaSink cause broker's OOM due to newly created transactionalId per checkpoint
[ https://issues.apache.org/jira/browse/FLINK-34554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hilmi Al Fatih updated FLINK-34554: --- Description: Flink version: 1.17.1 Kafka Broker version: 2.7.1 * 4 GB heap memory for each Hi, We recently had an outage in our production system after we perform a Flink kafka-connector API upgrade. To give a brief context, our application is a simple kafka-to-kafka pipeline with minimal processing. We run in EXACTLY_ONCE mode, thus kafka transaction is involved. Our application runs with total around 350 sink subtask. Checkpoint period was set to 5 seconds to avoid blocking {{read_committed}} consumers too long. We recently performed an upgrade with the following details: Previous state: * Flink version: 1.14.4 * Broker version: 2.7.1 * kafka connector API: FlinkKafkaProducer Update to: * Flink version: 1.17.1 * Broker version: 2.7.1 * kafka connector API: KafkaSink Around 10 hours after the deployment, our kafka broker started to failing with OOM error. Heap dump entries are dominated by the ProducerStateEntry records. Our investigation leads to finding the total implementation change between FlinkKafkaProducer and KafkaSink. * KafkaSink generate different transactionalId for each checkpoint, * FlinkKafkaProducer uses constant set of transactionalId pool. With this behavior, KafkaSink seemed to exhaust our broker heap very fast and the ProducerStateEntry will only expire after [transactional.id.expiration.ms|http://transactional.id.expiration.ms/] , which by default is set to 7 days. ([ref1|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/Log.scala#L677], [ref2|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/LogManager.scala#L268], [ref3|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/LogManager.scala#L1207]) For our job, it means it creates roughly: * 10 hour running) 350 ids * 12 times/minute * 60 min/hour * 10 hour ~ 2,520,000 * 7 days) ~ 42mil entries. Attached below is the number of ProducerStateEntry entries of heap dump when it is OOM: * 505,000 (6.5%), in total it would be roughly ~ 7,000,000 entries. There are several things that come up in our mind to mitigate the drawbacks such as: * reduce the number of subtasks, so it reduces the number of transactionalId * Enlarge the checkpoint period to reduce the newly generated transactionalId rate. * Shorten [transactional.id.expiration.ms|http://transactional.id.expiration.ms/] to expire the unused transactionalId soon. * Increase the broker heap However, above mitigation might be too cumbersome and need careful tuning which harm our flexibility.In addition, due to the lack of maintaining lingering transaction state, TransactionAborter seems to abort old transaction naively. We might be accidentally (or purposefully) reuse the same transactionalIdPrefix and start the counter from 0. In that case, if the old transactionalId happens to have epoch >0, it will keep looping aborting the nonexistent transactions up to the latest checkpoint counter (which may be too big) and make the job stuck. Btw, I am aware that in Flink 2.0, you guys are putting a lot of effort on creating better integration with Kafka transaction ([FLIP-319|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710]). In FLIP-319, it mentions something about TID pooling. However, it is seem that there is no relevant page yet for it, so I wonder whether there are any concrete plan already that I can follow, or if there is something I can contribute to, I will be really happy to help. was: Flink version: 1.17.1 Kafka Broker version: 2.7.1 * 4 GB heap memory for each Hi, We recently had an outage in our production system after we perform a Flink kafka-connector API upgrade. To give a brief context, our application is a simple kafka-to-kafka pipeline with minimal processing. We run in EXACTLY_ONCE mode, thus kafka transaction is involved. Our application runs with total around 350 sink subtask. Checkpoint period was set to 5 seconds to avoid blocking {{read_committed}} consumers too long. We recently performed an upgrade with the following details: * Previous state: * Flink version: 1.14.4 * Broker version: 2.7.1 * kafka connector API: FlinkKafkaProducer * Update to: * Flink version: 1.17.1 * Broker version: 2.7.1 * kafka connector API: KafkaSink Around 10 hours after the deployment, our kafka broker started to failing with OOM error. Heap dump entries are dominated by the ProducerStateEntry records. Our investigation leads to finding the total implementation change between FlinkKafkaProducer and KafkaSink. * KafkaSink generate different transactionalId for each checkpoint, * FlinkKafkaProducer uses constant
[jira] [Updated] (FLINK-34554) Using EXACTLY_ONCE with KafkaSink cause broker's OOM due to newly created transactionalId per checkpoint
[ https://issues.apache.org/jira/browse/FLINK-34554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hilmi Al Fatih updated FLINK-34554: --- Priority: Blocker (was: Minor) > Using EXACTLY_ONCE with KafkaSink cause broker's OOM due to newly created > transactionalId per checkpoint > > > Key: FLINK-34554 > URL: https://issues.apache.org/jira/browse/FLINK-34554 > Project: Flink > Issue Type: New Feature > Components: Connectors / Kafka >Affects Versions: 1.16.3, 1.17.2, 1.18.1 >Reporter: Hilmi Al Fatih >Priority: Blocker > Fix For: 1.16.3, 1.17.2, 1.18.1 > > Attachments: image (4).png, image (5).png > > > Flink version: 1.17.1 > Kafka Broker version: 2.7.1 * 4 GB heap memory for each > Hi, > We recently had an outage in our production system after we perform a Flink > kafka-connector API upgrade. To give a brief context, our application is a > simple kafka-to-kafka pipeline with minimal processing. We run in > EXACTLY_ONCE mode, thus kafka transaction is involved. > Our application runs with total around 350 sink subtask. Checkpoint period > was set to 5 seconds to avoid blocking {{read_committed}} consumers too long. > We recently performed an upgrade with the following details: * Previous state: > * Flink version: 1.14.4 > * Broker version: 2.7.1 > * kafka connector API: FlinkKafkaProducer > * Update to: > * Flink version: 1.17.1 > * Broker version: 2.7.1 > * kafka connector API: KafkaSink > Around 10 hours after the deployment, our kafka broker started to failing > with OOM error. Heap dump entries are dominated by the ProducerStateEntry > records. > Our investigation leads to finding the total implementation change between > FlinkKafkaProducer and KafkaSink. * KafkaSink generate different > transactionalId for each checkpoint, > * FlinkKafkaProducer uses constant set of transactionalId pool. > With this behavior, KafkaSink seemed to exhaust our broker heap very fast and > the ProducerStateEntry will only expire after > [transactional.id.expiration.ms|http://transactional.id.expiration.ms/] , > which by default is set to 7 days. > ([ref1|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/Log.scala#L677], > > [ref2|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/LogManager.scala#L268], > > [ref3|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/LogManager.scala#L1207])For > our job, it means it creates roughly: > 10 hour running) 350 ids * 12 times/minute * 60 min/hour * 10 hour ~ 2,520,000 > 7 days) ~ 42mil entries.Here is attached the number of ProducerStateEntry > entries of heap dump when it is OOM: > 505,000 (6.5%), in total it would be roughly ~ 7,000,000 entries.There are > several things that come up in our mind to mitigate the drawbacks such as: * > reduce the number of subtasks, so it reduces the number of transactionalId > * Enlarge the checkpoint period to reduce the newly generated > transactionalId rate. > * Shorten > [transactional.id.expiration.ms|http://transactional.id.expiration.ms/] to > expire the unused transactionalId soon. > * Increase the broker heap > However, above mitigation might be too cumbersome and need careful tuning > which harm our flexibility.In addition, due to the lack of maintaining > lingering transaction state, TransactionAborter seems to abort old > transaction naively. We might be accidentally (or purposefully) reuse the > same transactionalIdPrefix and start the counter from 0. In that case, if the > old transactionalId happens to have epoch >0, it will keep looping aborting > the nonexistent transactions up to the latest checkpoint counter (which may > be too big) and make the job stuck.Btw, I am aware that in Flink 2.0, you > guys are putting a lot of effort on creating better integration with Kafka > transaction > ([FLIP-319|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710]). > In FLIP-319, it mentions something about TID pooling. However, it is seem > that there is no relevant page yet for it, so I wonder whether there are any > concrete plan already that I can follow, or if there is something I can > contribute to, I will be really happy to help. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34554) Using EXACTLY_ONCE with KafkaSink cause broker's OOM due to newly created transactionalId per checkpoint
[ https://issues.apache.org/jira/browse/FLINK-34554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hilmi Al Fatih updated FLINK-34554: --- Issue Type: Improvement (was: New Feature) > Using EXACTLY_ONCE with KafkaSink cause broker's OOM due to newly created > transactionalId per checkpoint > > > Key: FLINK-34554 > URL: https://issues.apache.org/jira/browse/FLINK-34554 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: 1.16.3, 1.17.2, 1.18.1 >Reporter: Hilmi Al Fatih >Priority: Blocker > Fix For: 1.16.3, 1.17.2, 1.18.1 > > Attachments: image (4).png, image (5).png > > > Flink version: 1.17.1 > Kafka Broker version: 2.7.1 * 4 GB heap memory for each > Hi, > We recently had an outage in our production system after we perform a Flink > kafka-connector API upgrade. To give a brief context, our application is a > simple kafka-to-kafka pipeline with minimal processing. We run in > EXACTLY_ONCE mode, thus kafka transaction is involved. > Our application runs with total around 350 sink subtask. Checkpoint period > was set to 5 seconds to avoid blocking {{read_committed}} consumers too long. > We recently performed an upgrade with the following details: * Previous state: > * Flink version: 1.14.4 > * Broker version: 2.7.1 > * kafka connector API: FlinkKafkaProducer > * Update to: > * Flink version: 1.17.1 > * Broker version: 2.7.1 > * kafka connector API: KafkaSink > Around 10 hours after the deployment, our kafka broker started to failing > with OOM error. Heap dump entries are dominated by the ProducerStateEntry > records. > Our investigation leads to finding the total implementation change between > FlinkKafkaProducer and KafkaSink. * KafkaSink generate different > transactionalId for each checkpoint, > * FlinkKafkaProducer uses constant set of transactionalId pool. > With this behavior, KafkaSink seemed to exhaust our broker heap very fast and > the ProducerStateEntry will only expire after > [transactional.id.expiration.ms|http://transactional.id.expiration.ms/] , > which by default is set to 7 days. > ([ref1|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/Log.scala#L677], > > [ref2|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/LogManager.scala#L268], > > [ref3|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/LogManager.scala#L1207])For > our job, it means it creates roughly: > 10 hour running) 350 ids * 12 times/minute * 60 min/hour * 10 hour ~ 2,520,000 > 7 days) ~ 42mil entries.Here is attached the number of ProducerStateEntry > entries of heap dump when it is OOM: > 505,000 (6.5%), in total it would be roughly ~ 7,000,000 entries.There are > several things that come up in our mind to mitigate the drawbacks such as: * > reduce the number of subtasks, so it reduces the number of transactionalId > * Enlarge the checkpoint period to reduce the newly generated > transactionalId rate. > * Shorten > [transactional.id.expiration.ms|http://transactional.id.expiration.ms/] to > expire the unused transactionalId soon. > * Increase the broker heap > However, above mitigation might be too cumbersome and need careful tuning > which harm our flexibility.In addition, due to the lack of maintaining > lingering transaction state, TransactionAborter seems to abort old > transaction naively. We might be accidentally (or purposefully) reuse the > same transactionalIdPrefix and start the counter from 0. In that case, if the > old transactionalId happens to have epoch >0, it will keep looping aborting > the nonexistent transactions up to the latest checkpoint counter (which may > be too big) and make the job stuck.Btw, I am aware that in Flink 2.0, you > guys are putting a lot of effort on creating better integration with Kafka > transaction > ([FLIP-319|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710]). > In FLIP-319, it mentions something about TID pooling. However, it is seem > that there is no relevant page yet for it, so I wonder whether there are any > concrete plan already that I can follow, or if there is something I can > contribute to, I will be really happy to help. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34554) Using EXACTLY_ONCE with KafkaSink cause broker's OOM due to newly created transactionalId per checkpoint
Hilmi Al Fatih created FLINK-34554: -- Summary: Using EXACTLY_ONCE with KafkaSink cause broker's OOM due to newly created transactionalId per checkpoint Key: FLINK-34554 URL: https://issues.apache.org/jira/browse/FLINK-34554 Project: Flink Issue Type: New Feature Components: Connectors / Kafka Affects Versions: 1.18.1, 1.17.2, 1.16.3 Reporter: Hilmi Al Fatih Fix For: 1.18.1, 1.17.2, 1.16.3 Attachments: image (4).png, image (5).png Flink version: 1.17.1 Kafka Broker version: 2.7.1 * 4 GB heap memory for each Hi, We recently had an outage in our production system after we perform a Flink kafka-connector API upgrade. To give a brief context, our application is a simple kafka-to-kafka pipeline with minimal processing. We run in EXACTLY_ONCE mode, thus kafka transaction is involved. Our application runs with total around 350 sink subtask. Checkpoint period was set to 5 seconds to avoid blocking {{read_committed}} consumers too long. We recently performed an upgrade with the following details: * Previous state: * Flink version: 1.14.4 * Broker version: 2.7.1 * kafka connector API: FlinkKafkaProducer * Update to: * Flink version: 1.17.1 * Broker version: 2.7.1 * kafka connector API: KafkaSink Around 10 hours after the deployment, our kafka broker started to failing with OOM error. Heap dump entries are dominated by the ProducerStateEntry records. Our investigation leads to finding the total implementation change between FlinkKafkaProducer and KafkaSink. * KafkaSink generate different transactionalId for each checkpoint, * FlinkKafkaProducer uses constant set of transactionalId pool. With this behavior, KafkaSink seemed to exhaust our broker heap very fast and the ProducerStateEntry will only expire after [transactional.id.expiration.ms|http://transactional.id.expiration.ms/] , which by default is set to 7 days. ([ref1|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/Log.scala#L677], [ref2|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/LogManager.scala#L268], [ref3|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/LogManager.scala#L1207])For our job, it means it creates roughly: 10 hour running) 350 ids * 12 times/minute * 60 min/hour * 10 hour ~ 2,520,000 7 days) ~ 42mil entries.Here is attached the number of ProducerStateEntry entries of heap dump when it is OOM: 505,000 (6.5%), in total it would be roughly ~ 7,000,000 entries.There are several things that come up in our mind to mitigate the drawbacks such as: * reduce the number of subtasks, so it reduces the number of transactionalId * Enlarge the checkpoint period to reduce the newly generated transactionalId rate. * Shorten [transactional.id.expiration.ms|http://transactional.id.expiration.ms/] to expire the unused transactionalId soon. * Increase the broker heap However, above mitigation might be too cumbersome and need careful tuning which harm our flexibility.In addition, due to the lack of maintaining lingering transaction state, TransactionAborter seems to abort old transaction naively. We might be accidentally (or purposefully) reuse the same transactionalIdPrefix and start the counter from 0. In that case, if the old transactionalId happens to have epoch >0, it will keep looping aborting the nonexistent transactions up to the latest checkpoint counter (which may be too big) and make the job stuck.Btw, I am aware that in Flink 2.0, you guys are putting a lot of effort on creating better integration with Kafka transaction ([FLIP-319|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710]). In FLIP-319, it mentions something about TID pooling. However, it is seem that there is no relevant page yet for it, so I wonder whether there are any concrete plan already that I can follow, or if there is something I can contribute to, I will be really happy to help. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34553) Time travel support by Flink catalogs
Mehmet Aktas created FLINK-34553: Summary: Time travel support by Flink catalogs Key: FLINK-34553 URL: https://issues.apache.org/jira/browse/FLINK-34553 Project: Flink Issue Type: Technical Debt Reporter: Mehmet Aktas I am trying to add time travel support for the Flink backend in [Ibis|https://github.com/ibis-project/ibis]. I found that Flink requires the {{catalog}} to implement {{getTable(ObjectPath tablePath, long timestamp)}} for time travel support: Attention: Currently, time travel requires the corresponding catalog that the table belongs to implementing the getTable(ObjectPath tablePath, long timestamp) method. See more details in Catalog. [[https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/time-travel/]] The default {{GenericInMemoryCatalog}} does not seem to implement {{getTable()}} . I set up a {{hive metastore}} and created a {{{}hive catalog{}}}, but it turns out that, hive catalog also does not implement {{getTable()}} -- I wish Flink docs were more detailed about these ... py4j.protocol.Py4JJavaError: An error occurred while calling o8.sqlQuery. : org.apache.flink.table.api.ValidationException: SQL validation failed. getTable(ObjectPath, long) is not implemented for class org.apache.flink.table.catalog.hive.HiveCatalog. I have two options now to continue with this: * Try another catalog, like Iceberg catalog, to see if that one implements {{getTable()}} ** {{{}{}}}I am not able to find information on whether a given catalog implements this function. Should I dig into their source code to figure this out, or is there any other way to find out? * Implement a custom catalog with {{{}getTable(){}}}, following the *very-brief-instructions* given in the [Flink doc|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#interface-in-catalog-for-supporting-time-travel]. Is there a doc, article, mailing list or anything else that I can use to get more information on * Which catalogs implement time travel support? * If there is not readily available catalog supporting time travel, then how can we implement a custom catalog with time travel support? ** The instructions given [here|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#interface-in-catalog-for-supporting-time-travel] are unfortunately not sufficient for a Java-illiterate person like myself. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34517][table]fix environment configs ignored when calling procedure operation [flink]
JustinLeesin commented on PR #24397: URL: https://github.com/apache/flink/pull/24397#issuecomment-1971516132 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34517][table]fix environment configs ignored when calling procedure operation [flink]
JustinLeesin commented on PR #24397: URL: https://github.com/apache/flink/pull/24397#issuecomment-1971488431 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34517][table]fix environment configs ignored when calling procedure operation [flink]
JustinLeesin commented on code in PR #24397: URL: https://github.com/apache/flink/pull/24397#discussion_r1507837138 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java: ## @@ -210,6 +214,31 @@ void testNamedArgumentsWithOptionalArguments() { ResolvedSchema.of(Column.physical("result", DataTypes.STRING(; } +@Test +void testEnvironmentConf() throws DatabaseAlreadyExistException { +// root conf should work +Configuration configuration = new Configuration(); +configuration.setString("key1", "value1"); +StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration); +StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); +TestProcedureCatalogFactory.CatalogWithBuiltInProcedure procedureCatalog = +new TestProcedureCatalogFactory.CatalogWithBuiltInProcedure("procedure_catalog"); +procedureCatalog.createDatabase( +"system", new CatalogDatabaseImpl(Collections.emptyMap(), null), true); +tableEnv.registerCatalog("test_p", procedureCatalog); +tableEnv.useCatalog("test_p"); Review Comment: OK, I have refine the test case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34552) Support message deduplication for input data sources
[ https://issues.apache.org/jira/browse/FLINK-34552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822189#comment-17822189 ] Sergey Anokhovskiy commented on FLINK-34552: Thank you for the comment [~martijnvisser] {quote}> produces messages with duplicate payload within PK That basically conflicts with the entire premise of Flink where a primary key constraint is a hint for Flink to leverage for optimizations. {quote} I'm not sure I understood your message. Here is an example of the message sequence for the kafka topic with duplicates: (key1, Refresh, payload1), (key2, Refresh, payload2), (key1, Refresh, payload1), (key2, Refersh, payload3) {quote} But how would it then work at the source? Aren't you just moving the problem from somewhere else in the logic to the front of the logic, since then the source would have to keep this interval there? You will still encounter a large delay in that case. {quote} As mentioned in the description of the ticket there are different strategies to deduplicate data stream. For my use case, I'm particularly interested in the last one where the job keeps hashes of the last message for each PK and filter out messages if the payload & message_type are not changed. At some point I agree with you that it's a question about the best place for this logic. I had to create a separate service, however I'd like to see it as parameterized feature in Table API {quote}Kafka topics a and b are CDC data streams and contain DELETE and REFRESH messages. Are you actually using the upsert-kafka source, and treat these input sources as a changelog stream? {quote} The main point was that ROW_NUMBER approach doesn't work for CDC streams due to inability to take into account message_type {quote}I would like to understand as well how you would propose to have this logic expressed? Is it still SQL? Is it connector parameters? {quote} More as a connector parameter > Support message deduplication for input data sources > > > Key: FLINK-34552 > URL: https://issues.apache.org/jira/browse/FLINK-34552 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API, Table SQL / Runtime >Reporter: Sergey Anokhovskiy >Priority: Major > > My main proposal is to have duplicate message suppression logic as a part of > Flink Table API to be able to suppress duplicates from the input sources. It > might be a parameter provided by the user if they want to suppress duplicates > from the input source or not. Below I provided more details about my use case > and available approaches. > > I have a Flink job which reads from two keyed kafka topics and emits messages > to the keyed kafka topic. The Flink job executes the join query: > SELECT a.id, adata, bdata > FROM a > JOIN b > ON a.id = b.id > > One of the input kafka topics produces messages with duplicate payload within > PK in addition to meaningful data. That causes duplicates in the output topic > and creates extra load to the downstream services. > > I was looking for a way to suppress duplicates and I found two strategies > which doesn't seem to work for my use case: > # Based on the deduplication window as a kafka[ sink > buffer|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L46] > for example. The Deduplication window doesn't work well for my case because > the interval between duplicates is one day and I don't want my data to be > delayed if I use such a big window. > > # Using > [ROW_NUMBER|https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/deduplication/] > . Unfortunately, this approach doesn't suit my use case either. Kafka topics > a and b are CDC data streams and contain DELETE and REFRESH messages. If > DELETE and REFRESH messages are coming with the same payload the job will > suppress the last message which will lead to the incorrect output result. If > I add message_type to the PARTITION key then the job will not be able to > process messages sequences like this: DELETE->REFRESH->DELETE (with the same > payload and PK), because the last message will be suppressed which will lead > to the incorrect output result. > > Finally, I had to create a separate custom Flink service which reads the > output topic of the initial job and suppresses duplicates keeping hashes of > the last processed message for each PK in the Flink state. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34522][core] Changing the Time to Duration for StateTtlConfig.Builder.cleanupInRocksdbCompactFilter [flink]
HuangXingBo commented on PR #24387: URL: https://github.com/apache/flink/pull/24387#issuecomment-1971427877 @1996fanrui The test failed is actually related to the implementation of `Duration`. Now, `Duration` is a wrapper for the java class in the JVM created by `gateway`. If `Duration` is used as a default parameter, it will advance the time of the JVM created by `gateway` used in `Duration`. The crux of the problem is that some test environment variables set in `TestBase` and the effect of jar packages added by dependent test environment variables would not be effective anymore. This results in the launched gateway not loading many flink test jars, so we will find that the test class is not found (there won't be a problem in the production code, it only affects the test because we add some test jars to the classpath in testing). The perfect solution is to change the Duration implementation, making Duration usable as a default parameter without affecting test passing. But this change would be significant, every part that uses `Duration` needs considering. In the short term, I believe w e just need to remove Duration as the default parameter, represent the default parameter with None, then check in the method if the parameter is `None` or not, similar to the implementation below. ``` def cleanup_in_rocksdb_compact_filter( self, query_time_after_num_entries, periodic_compaction_time=Duration.of_days(30)) -> \ 'StateTtlConfig.Builder': self._strategies[ StateTtlConfig.CleanupStrategies.Strategies.ROCKSDB_COMPACTION_FILTER] = \ StateTtlConfig.CleanupStrategies.RocksdbCompactFilterCleanupStrategy( query_time_after_num_entries, periodic_compaction_time if periodic_compaction_time else Duration.of_days(30)) return self ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-34552) Support message deduplication for input data sources
[ https://issues.apache.org/jira/browse/FLINK-34552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822172#comment-17822172 ] Martijn Visser edited comment on FLINK-34552 at 2/29/24 3:13 PM: - A couple of points from my point of view: {quote}produces messages with duplicate payload within PK{quote} That basically conflicts with the entire premise of Flink where a primary key constraint is a hint for Flink to leverage for optimizations. {quote}The Deduplication window doesn't work well for my case because the interval between duplicates is one day and I don't want my data to be delayed if I use such a big window.{quote} But how would it then work at the source? Aren't you just moving the problem from somewhere else in the logic to the front of the logic, since then the source would have to keep this interval there? You will still encounter a large delay in that case. {quote}Kafka topics a and b are CDC data streams and contain DELETE and REFRESH messages. {quote} Are you actually using the upsert-kafka source, and treat these input sources as a changelog stream? I would like to understand as well how you would propose to have this logic expressed? Is it still SQL? Is it connector parameters? was (Author: martijnvisser): A couple of points from my point of view: {quote}produces messages with duplicate payload within PK{quote} That basically conflicts with the entire premise of Flink where a primary key constraint is a hint for Flink to leverage for optimizations. {quote}The Deduplication window doesn't work well for my case because the interval between duplicates is one day and I don't want my data to be delayed if I use such a big window.{quote} But how would it then work at the source? Aren't you just moving the problem from somewhere else in the logic to the front of the logic, since then the source would have to keep this interval there? You will still encounter a large delay in that case. {quote}Kafka topics a and b are CDC data streams and contain DELETE and REFRESH messages. {quote} Are you actually using the upsert-kafka source, and treat these input sources as a changelog stream? > Support message deduplication for input data sources > > > Key: FLINK-34552 > URL: https://issues.apache.org/jira/browse/FLINK-34552 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API, Table SQL / Runtime >Reporter: Sergey Anokhovskiy >Priority: Major > > My main proposal is to have duplicate message suppression logic as a part of > Flink Table API to be able to suppress duplicates from the input sources. It > might be a parameter provided by the user if they want to suppress duplicates > from the input source or not. Below I provided more details about my use case > and available approaches. > > I have a Flink job which reads from two keyed kafka topics and emits messages > to the keyed kafka topic. The Flink job executes the join query: > SELECT a.id, adata, bdata > FROM a > JOIN b > ON a.id = b.id > > One of the input kafka topics produces messages with duplicate payload within > PK in addition to meaningful data. That causes duplicates in the output topic > and creates extra load to the downstream services. > > I was looking for a way to suppress duplicates and I found two strategies > which doesn't seem to work for my use case: > # Based on the deduplication window as a kafka[ sink > buffer|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L46] > for example. The Deduplication window doesn't work well for my case because > the interval between duplicates is one day and I don't want my data to be > delayed if I use such a big window. > > # Using > [ROW_NUMBER|https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/deduplication/] > . Unfortunately, this approach doesn't suit my use case either. Kafka topics > a and b are CDC data streams and contain DELETE and REFRESH messages. If > DELETE and REFRESH messages are coming with the same payload the job will > suppress the last message which will lead to the incorrect output result. If > I add message_type to the PARTITION key then the job will not be able to > process messages sequences like this: DELETE->REFRESH->DELETE (with the same > payload and PK), because the last message will be suppressed which will lead > to the incorrect output result. > > Finally, I had to create a separate custom Flink service which reads the > output topic of the initial job and suppresses duplicates keeping hashes of > the last processed message for each PK in the Flink state. -- This message was sent by
[jira] [Commented] (FLINK-34552) Support message deduplication for input data sources
[ https://issues.apache.org/jira/browse/FLINK-34552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822172#comment-17822172 ] Martijn Visser commented on FLINK-34552: A couple of points from my point of view: {quote}produces messages with duplicate payload within PK{quote} That basically conflicts with the entire premise of Flink where a primary key constraint is a hint for Flink to leverage for optimizations. {quote}The Deduplication window doesn't work well for my case because the interval between duplicates is one day and I don't want my data to be delayed if I use such a big window.{quote} But how would it then work at the source? Aren't you just moving the problem from somewhere else in the logic to the front of the logic, since then the source would have to keep this interval there? You will still encounter a large delay in that case. {quote}Kafka topics a and b are CDC data streams and contain DELETE and REFRESH messages. {quote} Are you actually using the upsert-kafka source, and treat these input sources as a changelog stream? > Support message deduplication for input data sources > > > Key: FLINK-34552 > URL: https://issues.apache.org/jira/browse/FLINK-34552 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API, Table SQL / Runtime >Reporter: Sergey Anokhovskiy >Priority: Major > > My main proposal is to have duplicate message suppression logic as a part of > Flink Table API to be able to suppress duplicates from the input sources. It > might be a parameter provided by the user if they want to suppress duplicates > from the input source or not. Below I provided more details about my use case > and available approaches. > > I have a Flink job which reads from two keyed kafka topics and emits messages > to the keyed kafka topic. The Flink job executes the join query: > SELECT a.id, adata, bdata > FROM a > JOIN b > ON a.id = b.id > > One of the input kafka topics produces messages with duplicate payload within > PK in addition to meaningful data. That causes duplicates in the output topic > and creates extra load to the downstream services. > > I was looking for a way to suppress duplicates and I found two strategies > which doesn't seem to work for my use case: > # Based on the deduplication window as a kafka[ sink > buffer|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L46] > for example. The Deduplication window doesn't work well for my case because > the interval between duplicates is one day and I don't want my data to be > delayed if I use such a big window. > > # Using > [ROW_NUMBER|https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/deduplication/] > . Unfortunately, this approach doesn't suit my use case either. Kafka topics > a and b are CDC data streams and contain DELETE and REFRESH messages. If > DELETE and REFRESH messages are coming with the same payload the job will > suppress the last message which will lead to the incorrect output result. If > I add message_type to the PARTITION key then the job will not be able to > process messages sequences like this: DELETE->REFRESH->DELETE (with the same > payload and PK), because the last message will be suppressed which will lead > to the incorrect output result. > > Finally, I had to create a separate custom Flink service which reads the > output topic of the initial job and suppresses duplicates keeping hashes of > the last processed message for each PK in the Flink state. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34419][docker] Add tests for JDK 17 & 21 [flink-docker]
morazow commented on code in PR #182: URL: https://github.com/apache/flink-docker/pull/182#discussion_r1507732247 ## .github/workflows/ci.yml: ## @@ -17,14 +17,22 @@ name: "CI" on: [push, pull_request] +env: + TAR_URL: "https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz; + jobs: ci: +name: CI using JDK ${{ matrix.java_version }} runs-on: ubuntu-latest +strategy: + fail-fast: false + max-parallel: 1 Review Comment: > > #11 8.399 + /bin/bash /opt/flink/bin/config-parser-utils.sh /opt/flink/conf /opt/flink/bin /opt/flink/lib -repKV rest.address,localhost,0.0.0.0 -repKV rest.bind-address,localhost,0.0.0.0 -repKV jobmanager.bind-host,localhost,0.0.0.0 -repKV taskmanager.bind-host,localhost,0.0.0.0 -rmKV taskmanager.host=localhost > > #11 8.625 sed: can't read /config.yaml: No such file or directory > > I forgot that we're also doing testing. Maybe, there's something odd with the config modification logic (see [workflow log](https://github.com/morazow/flink-docker/actions/runs/8081181748/job/22079149028#step:4:8838))? Yes, tests have these kind of issues. Another ones: ``` WARNING: attempted to load jemalloc from /usr/lib/x86_64-linux-gnu/libjemalloc.so but the library couldn't be found. glibc will be used instead. ./docker-entrypoint.sh: line 80: /home/runner/work/flink-docker/flink-docker/testing/bin/config-parser-utils.sh: No such file or directory jemalloc was enabled but it was not found in the system. LD_PRELOAD is unchanged and glibc will be used instead. ./docker-entrypoint.sh: line [8](https://github.com/morazow/flink-docker/actions/runs/8096433961/job/22125972877#step:4:9)0: /home/runner/work/flink-docker/flink-docker/testing/bin/config-parser-utils.sh: No such file or directory ``` But that line still happens when it is green also. Here the success run: https://github.com/morazow/flink-docker/actions/runs/8096433961/job/22125972520 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29370) Protobuf in flink-sql-protobuf is not shaded
[ https://issues.apache.org/jira/browse/FLINK-29370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822169#comment-17822169 ] tanjialiang commented on FLINK-29370: - [~jark] [~libenchao] [~maosuhan] I found that flink-sql-orc has the protobuf dependency without shading, and it conflicts with the flink-sql-protobuf, my flink version is 1.16.1. For now, the temporary solution is to shade the protobuf dependency in both the flink-sql-protobuf and user-proto classes by myself. > Protobuf in flink-sql-protobuf is not shaded > > > Key: FLINK-29370 > URL: https://issues.apache.org/jira/browse/FLINK-29370 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.16.0 >Reporter: Jark Wu >Priority: Major > > The protobuf classes in flink-sql-protobuf is not shaded which may lead to > class conflicts. Usually, sql jars should shade common used dependencies, > e.g. flink-sql-avro: > https://github.com/apache/flink/blob/master/flink-formats/flink-sql-avro/pom.xml#L88 > > {code} > ➜ Downloads jar tvf flink-sql-protobuf-1.16.0.jar | grep com.google > 0 Tue Sep 13 20:23:44 CST 2022 com/google/ > 0 Tue Sep 13 20:23:44 CST 2022 com/google/protobuf/ >568 Tue Sep 13 20:23:44 CST 2022 > com/google/protobuf/ProtobufInternalUtils.class > 19218 Tue Sep 13 20:23:44 CST 2022 > com/google/protobuf/AbstractMessage$Builder.class >259 Tue Sep 13 20:23:44 CST 2022 > com/google/protobuf/AbstractMessage$BuilderParent.class > 10167 Tue Sep 13 20:23:44 CST 2022 com/google/protobuf/AbstractMessage.class > 1486 Tue Sep 13 20:23:44 CST 2022 > com/google/protobuf/AbstractMessageLite$Builder$LimitedInputStream.class > 12399 Tue Sep 13 20:23:44 CST 2022 > com/google/protobuf/AbstractMessageLite$Builder.class >279 Tue Sep 13 20:23:44 CST 2022 > com/google/protobuf/AbstractMessageLite$InternalOneOfEnu > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34552) Support message deduplication for input data sources
[ https://issues.apache.org/jira/browse/FLINK-34552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822161#comment-17822161 ] Ashish Khatkar edited comment on FLINK-34552 at 2/29/24 2:50 PM: - Adding an example for this consider two tables Table A Fields : {A, B, C, D} Table B Fields : {A, E, F, G} Query : Select A, B, F from table A join table b on field A consider a case where the join will contain 1B records for \{A, B, C, D, E, F, G} but number of unique records for fields we are interested in are 1M \{A, B, F}. Now any change that happens in fields C, D, E, G is going to produce records -U\{A, B, F} +U\{A, B, F} (as the join will emit changelog stream) but effectively the records which we are interested in hasn’t changed. was (Author: akhatkar): Adding an example for this consider two tables Table A Fields : {A, B, C, D} Table B Fields : {A, E, F, G} Query : Select A, B, F from table A join table b on field A consider a case where the join will contain 1B records for \{A, B, C, D, E, F, G} but number of unique records for fields we are interested in are 1M \{A, B, F}. Now any change that happens in fields C, D, E, G is going to produce records -U\{A, B, F} +U\{A, B, F} (as the join will emit changelog stream) but effectively the records hasn’t changed. > Support message deduplication for input data sources > > > Key: FLINK-34552 > URL: https://issues.apache.org/jira/browse/FLINK-34552 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API, Table SQL / Runtime >Reporter: Sergey Anokhovskiy >Priority: Major > > My main proposal is to have duplicate message suppression logic as a part of > Flink Table API to be able to suppress duplicates from the input sources. It > might be a parameter provided by the user if they want to suppress duplicates > from the input source or not. Below I provided more details about my use case > and available approaches. > > I have a Flink job which reads from two keyed kafka topics and emits messages > to the keyed kafka topic. The Flink job executes the join query: > SELECT a.id, adata, bdata > FROM a > JOIN b > ON a.id = b.id > > One of the input kafka topics produces messages with duplicate payload within > PK in addition to meaningful data. That causes duplicates in the output topic > and creates extra load to the downstream services. > > I was looking for a way to suppress duplicates and I found two strategies > which doesn't seem to work for my use case: > # Based on the deduplication window as a kafka[ sink > buffer|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L46] > for example. The Deduplication window doesn't work well for my case because > the interval between duplicates is one day and I don't want my data to be > delayed if I use such a big window. > > # Using > [ROW_NUMBER|https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/deduplication/] > . Unfortunately, this approach doesn't suit my use case either. Kafka topics > a and b are CDC data streams and contain DELETE and REFRESH messages. If > DELETE and REFRESH messages are coming with the same payload the job will > suppress the last message which will lead to the incorrect output result. If > I add message_type to the PARTITION key then the job will not be able to > process messages sequences like this: DELETE->REFRESH->DELETE (with the same > payload and PK), because the last message will be suppressed which will lead > to the incorrect output result. > > Finally, I had to create a separate custom Flink service which reads the > output topic of the initial job and suppresses duplicates keeping hashes of > the last processed message for each PK in the Flink state. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]
WencongLiu commented on code in PR #24398: URL: https://github.com/apache/flink/pull/24398#discussion_r1507354167 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/AbstractSortPartitionOperator.java: ## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sort; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.typeinfo.AtomicType; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.AlgorithmOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.ManagedMemoryUseCase; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.operators.sort.ExternalSorter; +import org.apache.flink.runtime.operators.sort.PushSorter; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OperatorAttributes; +import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +/** + * The {@link AbstractSortPartitionOperator} is the base class of sort partition operator, which + * provides shared construction methods and utility functions. + * + * @param The type of input record. + * @param The type used to sort the records, which may be different from the INPUT_TYPE. + * For example, if the input record is sorted according to the selected key by {@link + * KeySelector}, the selected key should also be written to {@link ExternalSorter} with the + * input record to avid repeated key selections. In this case, the type used to sort the records + * will be a tuple containing both the selected key and record. + */ +@Internal +@SuppressWarnings("unchecked") +public abstract class AbstractSortPartitionOperator +extends AbstractStreamOperator +implements OneInputStreamOperator, BoundedOneInput { + +/** The default manage memory weight of sort partition operator. */ +public static final int DEFAULT_MANAGE_MEMORY_WEIGHT = 128; Review Comment: I keeped it same with the default managed memory weight value of batch keyed sorter. The developers could use `getTransformation().declareManagedMemoryUseCaseAtOperatorScope()` to set memory usage by themselves. Adding a separate configuration item for a datastream API would be a little complicated. WDYT? ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/AbstractSortPartitionOperator.java: ## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package
[jira] [Comment Edited] (FLINK-34552) Support message deduplication for input data sources
[ https://issues.apache.org/jira/browse/FLINK-34552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822161#comment-17822161 ] Ashish Khatkar edited comment on FLINK-34552 at 2/29/24 2:49 PM: - Adding an example for this consider two tables Table A Fields : {A, B, C, D} Table B Fields : {A, E, F, G} Query : Select A, B, F from table A join table b on field A consider a case where the join will contain 1B records for \{A, B, C, D, E, F, G} but number of unique records for fields we are interested in are 1M \{A, B, F}. Now any change that happens in fields C, D, E, G is going to produce records -U\{A, B, F} +U\{A, B, F} (as the join will emit changelog stream) but effectively the records hasn’t changed. was (Author: akhatkar): Adding an example for this consider two tables Table A Fields : {A, B, C, D} Table B Fields : {A, E, F, G} Query : Select A, B, F from table A join table b on field A consider a case where the join will contain 1B records for {A, B, C, D, E, F, G} but number of unique records for fields we are interested in are 1M {A, B, F}. now any change that happens in fields C, D, E, G is going to produce records -U{A, B, F} +U{A, B, F} (as the join will emit changelog stream) but effectively the records hasn’t changed. > Support message deduplication for input data sources > > > Key: FLINK-34552 > URL: https://issues.apache.org/jira/browse/FLINK-34552 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API, Table SQL / Runtime >Reporter: Sergey Anokhovskiy >Priority: Major > > My main proposal is to have duplicate message suppression logic as a part of > Flink Table API to be able to suppress duplicates from the input sources. It > might be a parameter provided by the user if they want to suppress duplicates > from the input source or not. Below I provided more details about my use case > and available approaches. > > I have a Flink job which reads from two keyed kafka topics and emits messages > to the keyed kafka topic. The Flink job executes the join query: > SELECT a.id, adata, bdata > FROM a > JOIN b > ON a.id = b.id > > One of the input kafka topics produces messages with duplicate payload within > PK in addition to meaningful data. That causes duplicates in the output topic > and creates extra load to the downstream services. > > I was looking for a way to suppress duplicates and I found two strategies > which doesn't seem to work for my use case: > # Based on the deduplication window as a kafka[ sink > buffer|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L46] > for example. The Deduplication window doesn't work well for my case because > the interval between duplicates is one day and I don't want my data to be > delayed if I use such a big window. > > # Using > [ROW_NUMBER|https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/deduplication/] > . Unfortunately, this approach doesn't suit my use case either. Kafka topics > a and b are CDC data streams and contain DELETE and REFRESH messages. If > DELETE and REFRESH messages are coming with the same payload the job will > suppress the last message which will lead to the incorrect output result. If > I add message_type to the PARTITION key then the job will not be able to > process messages sequences like this: DELETE->REFRESH->DELETE (with the same > payload and PK), because the last message will be suppressed which will lead > to the incorrect output result. > > Finally, I had to create a separate custom Flink service which reads the > output topic of the initial job and suppresses duplicates keeping hashes of > the last processed message for each PK in the Flink state. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]
WencongLiu commented on code in PR #24398: URL: https://github.com/apache/flink/pull/24398#discussion_r1507354390 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java: ## @@ -1071,6 +1077,110 @@ protected SingleOutputStreamOperator aggregate(AggregationFunction aggrega return reduce(aggregate).name("Keyed Aggregation"); } +@Override +public PartitionWindowedStream fullWindowPartition() { +throw new UnsupportedOperationException( +"KeyedStream doesn't support processing non-keyed partitions."); Review Comment: The note has been fixed. ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/AbstractSortPartitionOperator.java: ## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sort; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.typeinfo.AtomicType; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.AlgorithmOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.ManagedMemoryUseCase; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.operators.sort.ExternalSorter; +import org.apache.flink.runtime.operators.sort.PushSorter; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OperatorAttributes; +import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +/** + * The {@link AbstractSortPartitionOperator} is the base class of sort partition operator, which + * provides shared construction methods and utility functions. + * + * @param The type of input record. + * @param The type used to sort the records, which may be different from the INPUT_TYPE. + * For example, if the input record is sorted according to the selected key by {@link + * KeySelector}, the selected key should also be written to {@link ExternalSorter} with the + * input record to avid repeated key selections. In this case, the type used to sort the records + * will be a tuple containing both the selected key and record. + */ +@Internal +@SuppressWarnings("unchecked") +public abstract class AbstractSortPartitionOperator Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34552) Support message deduplication for input data sources
[ https://issues.apache.org/jira/browse/FLINK-34552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822161#comment-17822161 ] Ashish Khatkar commented on FLINK-34552: Adding an example for this consider two tables Table A Fields : {A, B, C, D} Table B Fields : {A, E, F, G} Query : Select A, B, F from table A join table b on field A consider a case where the join will contain 1B records for {A, B, C, D, E, F, G} but number of unique records for fields we are interested in are 1M {A, B, F}. now any change that happens in fields C, D, E, G is going to produce records -U{A, B, F} +U{A, B, F} (as the join will emit changelog stream) but effectively the records hasn’t changed. > Support message deduplication for input data sources > > > Key: FLINK-34552 > URL: https://issues.apache.org/jira/browse/FLINK-34552 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API, Table SQL / Runtime >Reporter: Sergey Anokhovskiy >Priority: Major > > My main proposal is to have duplicate message suppression logic as a part of > Flink Table API to be able to suppress duplicates from the input sources. It > might be a parameter provided by the user if they want to suppress duplicates > from the input source or not. Below I provided more details about my use case > and available approaches. > > I have a Flink job which reads from two keyed kafka topics and emits messages > to the keyed kafka topic. The Flink job executes the join query: > SELECT a.id, adata, bdata > FROM a > JOIN b > ON a.id = b.id > > One of the input kafka topics produces messages with duplicate payload within > PK in addition to meaningful data. That causes duplicates in the output topic > and creates extra load to the downstream services. > > I was looking for a way to suppress duplicates and I found two strategies > which doesn't seem to work for my use case: > # Based on the deduplication window as a kafka[ sink > buffer|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L46] > for example. The Deduplication window doesn't work well for my case because > the interval between duplicates is one day and I don't want my data to be > delayed if I use such a big window. > > # Using > [ROW_NUMBER|https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/deduplication/] > . Unfortunately, this approach doesn't suit my use case either. Kafka topics > a and b are CDC data streams and contain DELETE and REFRESH messages. If > DELETE and REFRESH messages are coming with the same payload the job will > suppress the last message which will lead to the incorrect output result. If > I add message_type to the PARTITION key then the job will not be able to > process messages sequences like this: DELETE->REFRESH->DELETE (with the same > payload and PK), because the last message will be suppressed which will lead > to the incorrect output result. > > Finally, I had to create a separate custom Flink service which reads the > output topic of the initial job and suppresses duplicates keeping hashes of > the last processed message for each PK in the Flink state. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34552) Support message deduplication for input data sources
[ https://issues.apache.org/jira/browse/FLINK-34552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Anokhovskiy updated FLINK-34552: --- Description: My main proposal is to have duplicate message suppression logic as a part of Flink Table API to be able to suppress duplicates from the input sources. It might be a parameter provided by the user if they want to suppress duplicates from the input source or not. Below I provided more details about my use case and available approaches. I have a Flink job which reads from two keyed kafka topics and emits messages to the keyed kafka topic. The Flink job executes the join query: SELECT a.id, adata, bdata FROM a JOIN b ON a.id = b.id One of the input kafka topics produces messages with duplicate payload within PK in addition to meaningful data. That causes duplicates in the output topic and creates extra load to the downstream services. I was looking for a way to suppress duplicates and I found two strategies which doesn't seem to work for my use case: # Based on the deduplication window as a kafka[ sink buffer|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L46] for example. The Deduplication window doesn't work well for my case because the interval between duplicates is one day and I don't want my data to be delayed if I use such a big window. # Using [ROW_NUMBER|https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/deduplication/] . Unfortunately, this approach doesn't suit my use case either. Kafka topics a and b are CDC data streams and contain DELETE and REFRESH messages. If DELETE and REFRESH messages are coming with the same payload the job will suppress the last message which will lead to the incorrect output result. If I add message_type to the PARTITION key then the job will not be able to process messages sequences like this: DELETE->REFRESH->DELETE (with the same payload and PK), because the last message will be suppressed which will lead to the incorrect output result. Finally, I had to create a separate custom Flink service which reads the output topic of the initial job and suppresses duplicates keeping hashes of the last processed message for each PK in the Flink state. was: My main proposal is: To have duplicate message suppression logic as a part of flink table api to be able to suppress duplicates from the input sources. It might be a parameter provided by user if they want to suppress duplicates from the input source or not. Below I provided more details about my use case and available approaches. I have a flink job which reads from two keyed kafka topics and emits messages to the keyed kafka topic. The flink job executes the join query: SELECT a.id, adata, bdata FROM a JOIN b ON a.id = b.id One of the input kafka topics produces messages with duplicate payload within PK in additional to meaningful data. That causes duplicates in the output topic and creates extra load to the downstream services. I was looking for a way to suppress duplicates and I found two strategies which doesn't seem to work for my use case: 1. Based on deduplication window as kafka sink buffer for example https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L46 Deduplication window doesn't work well for my case because the interval between duplicates is one day and I don't want my data to be delayed if use such a big window. 2. Using ROW_NUMBER https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/deduplication/ . Unfortunately, this approach doesn't suit my use case too. Kafka topics a and b are CDC data streams and contain DELETE and REFRESH messages. If DELETE and REFRESH messages are comming with the same payload the job will suppress the last message which will lead to the incorrect output result. If I add message_type to the PARTITION key then the job will not be able to process messages sequences like this: DELETE->REFRESH->DELETE (with the same payload and PK), because the last message will be suppressed which will lead to the incorrect output result. Finally, I had to create a separate custom flink service which reads the output topic of the initial job and suppresses duplicates keeping message hashes in the state. The initial join job, described above, still has to process duplicates. Would it better to be able to suppress duplicates from the input sources? > Support message deduplication for input data sources > > > Key: FLINK-34552 > URL: https://issues.apache.org/jira/browse/FLINK-34552 > Project: Flink >
[jira] [Created] (FLINK-34551) Align retry mechanisms of FutureUtils
Matthias Pohl created FLINK-34551: - Summary: Align retry mechanisms of FutureUtils Key: FLINK-34551 URL: https://issues.apache.org/jira/browse/FLINK-34551 Project: Flink Issue Type: Technical Debt Components: API / Core Affects Versions: 1.20.0 Reporter: Matthias Pohl The retry mechanisms of FutureUtils include quite a bit of redundant code which makes it hard to understand and to extend. The logic should be aligned properly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34552) Support message deduplication for input data sources
Sergey Anokhovskiy created FLINK-34552: -- Summary: Support message deduplication for input data sources Key: FLINK-34552 URL: https://issues.apache.org/jira/browse/FLINK-34552 Project: Flink Issue Type: New Feature Components: Table SQL / API, Table SQL / Runtime Reporter: Sergey Anokhovskiy My main proposal is: To have duplicate message suppression logic as a part of flink table api to be able to suppress duplicates from the input sources. It might be a parameter provided by user if they want to suppress duplicates from the input source or not. Below I provided more details about my use case and available approaches. I have a flink job which reads from two keyed kafka topics and emits messages to the keyed kafka topic. The flink job executes the join query: SELECT a.id, adata, bdata FROM a JOIN b ON a.id = b.id One of the input kafka topics produces messages with duplicate payload within PK in additional to meaningful data. That causes duplicates in the output topic and creates extra load to the downstream services. I was looking for a way to suppress duplicates and I found two strategies which doesn't seem to work for my use case: 1. Based on deduplication window as kafka sink buffer for example https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L46 Deduplication window doesn't work well for my case because the interval between duplicates is one day and I don't want my data to be delayed if use such a big window. 2. Using ROW_NUMBER https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/deduplication/ . Unfortunately, this approach doesn't suit my use case too. Kafka topics a and b are CDC data streams and contain DELETE and REFRESH messages. If DELETE and REFRESH messages are comming with the same payload the job will suppress the last message which will lead to the incorrect output result. If I add message_type to the PARTITION key then the job will not be able to process messages sequences like this: DELETE->REFRESH->DELETE (with the same payload and PK), because the last message will be suppressed which will lead to the incorrect output result. Finally, I had to create a separate custom flink service which reads the output topic of the initial job and suppresses duplicates keeping message hashes in the state. The initial join job, described above, still has to process duplicates. Would it better to be able to suppress duplicates from the input sources? -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34419][docker] Add tests for JDK 17 & 21 [flink-docker]
morazow commented on code in PR #182: URL: https://github.com/apache/flink-docker/pull/182#discussion_r1507649411 ## .github/workflows/ci.yml: ## @@ -17,14 +17,22 @@ name: "CI" on: [push, pull_request] +env: + TAR_URL: "https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz; + jobs: ci: +name: CI using JDK ${{ matrix.java_version }} runs-on: ubuntu-latest +strategy: + fail-fast: false + max-parallel: 1 Review Comment: Read about the GHA concurrency: https://docs.github.com/en/actions/using-jobs/using-concurrency#using-concurrency-in-different-scenarios My assumption is that I pushed another commit while another workflow was running and maybe they clashed on container name. But I tried to trigger several workflow runs, still could not reproduce the issue above. I would suggest to add concurrency definition to our matrix builds: ``` concurrency: group: ${{ github.workflow }}-${{ github.ref }}-java-${{ matrix.java_version }} cancel-in-progress: true ``` This way if commits pushed one after the other only the last commit will run the workflow. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34419][docker] Add tests for JDK 17 & 21 [flink-docker]
morazow commented on code in PR #182: URL: https://github.com/apache/flink-docker/pull/182#discussion_r1507649411 ## .github/workflows/ci.yml: ## @@ -17,14 +17,22 @@ name: "CI" on: [push, pull_request] +env: + TAR_URL: "https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz; + jobs: ci: +name: CI using JDK ${{ matrix.java_version }} runs-on: ubuntu-latest +strategy: + fail-fast: false + max-parallel: 1 Review Comment: Read about the GHA concurrency: https://docs.github.com/en/actions/using-jobs/using-concurrency#using-concurrency-in-different-scenarios My assumption is that I pushed another commit while another workflow was running and maybe they clashed on container name. I would suggest to add concurrency definition to our matrix builds: ``` concurrency: group: ${{ github.workflow }}-${{ github.ref }}-java-${{ matrix.java_version }} cancel-in-progress: true ``` This way if commits pushed one after the other only the last commit will run the workflow. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34419][docker] Add tests for JDK 17 & 21 [flink-docker]
morazow commented on code in PR #182: URL: https://github.com/apache/flink-docker/pull/182#discussion_r1507649411 ## .github/workflows/ci.yml: ## @@ -17,14 +17,22 @@ name: "CI" on: [push, pull_request] +env: + TAR_URL: "https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz; + jobs: ci: +name: CI using JDK ${{ matrix.java_version }} runs-on: ubuntu-latest +strategy: + fail-fast: false + max-parallel: 1 Review Comment: Read about the GHA concurrency: https://docs.github.com/en/actions/using-jobs/using-concurrency#using-concurrency-in-different-scenarios My assumption is that I pushed another commit while another workflow was running and maybe they clashed on container name. I would suggest to add concurrency definition to our matrix builds: ``` concurrency: group: ${{ github.workflow }}-${{ github.ref }}-java-${{ matrix.java_version }} cancel-in-progress: true ``` This way if commits pushed one after the other only the last commit will run the workflow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34419][docker] Add tests for JDK 17 & 21 [flink-docker]
XComp commented on code in PR #182: URL: https://github.com/apache/flink-docker/pull/182#discussion_r1507624836 ## .github/workflows/ci.yml: ## @@ -17,14 +17,22 @@ name: "CI" on: [push, pull_request] +env: + TAR_URL: "https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz; + jobs: ci: +name: CI using JDK ${{ matrix.java_version }} runs-on: ubuntu-latest +strategy: + fail-fast: false + max-parallel: 1 Review Comment: > #11 8.399 + /bin/bash /opt/flink/bin/config-parser-utils.sh /opt/flink/conf /opt/flink/bin /opt/flink/lib -repKV rest.address,localhost,0.0.0.0 -repKV rest.bind-address,localhost,0.0.0.0 -repKV jobmanager.bind-host,localhost,0.0.0.0 -repKV taskmanager.bind-host,localhost,0.0.0.0 -rmKV taskmanager.host=localhost > #11 8.625 sed: can't read /config.yaml: No such file or directory I forgot that we're also doing testing. Maybe, there's something odd with the config modification logic (see [workflow log](https://github.com/morazow/flink-docker/actions/runs/8081181748/job/22079149028#step:4:8838))? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34419][docker] Add tests for JDK 17 & 21 [flink-docker]
XComp commented on code in PR #182: URL: https://github.com/apache/flink-docker/pull/182#discussion_r1507624836 ## .github/workflows/ci.yml: ## @@ -17,14 +17,22 @@ name: "CI" on: [push, pull_request] +env: + TAR_URL: "https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz; + jobs: ci: +name: CI using JDK ${{ matrix.java_version }} runs-on: ubuntu-latest +strategy: + fail-fast: false + max-parallel: 1 Review Comment: > #11 8.399 + /bin/bash /opt/flink/bin/config-parser-utils.sh /opt/flink/conf /opt/flink/bin /opt/flink/lib -repKV rest.address,localhost,0.0.0.0 -repKV rest.bind-address,localhost,0.0.0.0 -repKV jobmanager.bind-host,localhost,0.0.0.0 -repKV taskmanager.bind-host,localhost,0.0.0.0 -rmKV taskmanager.host=localhost > #11 8.625 sed: can't read /config.yaml: No such file or directory Maybe, there's something odd with the config modification logic (see [workflow log](https://github.com/morazow/flink-docker/actions/runs/8081181748/job/22079149028#step:4:8838))? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-34550) attempted task still report metric of currentEmitEventTimeLag
siwei.gao created FLINK-34550: - Summary: attempted task still report metric of currentEmitEventTimeLag Key: FLINK-34550 URL: https://issues.apache.org/jira/browse/FLINK-34550 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.17.1 Environment: flink version:1.17.1 kafka-connector:1.17.1 Reporter: siwei.gao Attachments: image-2024-02-29-21-41-01-709.png, image-2024-02-29-21-43-18-340.png, image-2024-02-29-21-50-55-160.png Attempted task still report metric of currentEmitEventTimeLag when use kafka-connector.Attempt_num for reporting indicators of multiple tasks with the same subtask_index but different task_attempt_num times within the same time period. !image-2024-02-29-21-43-18-340.png! Only the metric which tash_attempt_num is 4 should be reported normally. This condition shows in taskmanager with multiple slots and it's ok when taskmanager only has one slot. !image-2024-02-29-21-50-55-160.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33436] Documentation for the built-in Profiler [flink]
yuchen-ecnu commented on PR #24403: URL: https://github.com/apache/flink/pull/24403#issuecomment-1971160199 Hi @rmetzger, thanks for the quick and detailed comments! While async-profiler supports most platforms, not all. So that's a good suggestion to add the "Requirements" section, and I've added that in the latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34419][docker] Add tests for JDK 17 & 21 [flink-docker]
morazow commented on code in PR #182: URL: https://github.com/apache/flink-docker/pull/182#discussion_r1507558823 ## .github/workflows/ci.yml: ## @@ -17,14 +17,22 @@ name: "CI" on: [push, pull_request] +env: + TAR_URL: "https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz; + jobs: ci: +name: CI using JDK ${{ matrix.java_version }} runs-on: ubuntu-latest +strategy: + fail-fast: false + max-parallel: 1 Review Comment: But the rerun of the job is fine: https://github.com/morazow/flink-docker/actions/runs/8081181748 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-34529) Projection cannot be pushed down through rank operator.
[ https://issues.apache.org/jira/browse/FLINK-34529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822118#comment-17822118 ] Benchao Li edited comment on FLINK-34529 at 2/29/24 1:08 PM: - [~xuyangzhong] [~nilerzhou] Thank you for the explanation, it helps. I would prefer to putting these transposing rules all in "LOGICAL" stage, since in this stage we are using cost-based planner. I'm wondering if it's really necessary to have some transposing rules (now only {{{}CalcRankTransposeRule{}}}) in "LOGICAL_REWRITE" stage, could you check whether we still needs {{CalcRankTransposeRule}} in "LOGICAL_REWRITE" after introducing {{ProjectWindowTransposeRule}} in "LOGICAL" stage? What's more, I'm even wondering that if we really needs {{{}CalcRankTransposeRule{}}}. {{Rank}} is a special form of {{{}Window{}}}, so {{ProjectWindowTransposeRule}} should supersede {{{}CalcRankTransposeRule{}}}. (By saying "transposing rules", usually I would expect these rules are only generating more plan alternatives, cost-based planner chooses which is is better via cost. That's why you can see many counter pairs of rules like {{AggregateFilterTransposeRule}} and {{FilterAggregateTransposeRule}} in Calcite) was (Author: libenchao): [~xuyangzhong][~nilerzhou] Thank you for the explanation, it helps. I would prefer to putting these transposing rules all in "LOGICAL" stage, since in this stage we are using cost-based planner. I'm wondering if it's really necessary to have some transposing rules (now only {{CalcJoinTransposeRule}}) in "LOGICAL_REWRITE" stage, could you check whether we still needs {{CalcJoinTransposeRule}} in "LOGICAL_REWRITE" after introducing {{ProjectWindowTransposeRule}} in "LOGICAL" stage? What's more, I'm even wondering that if we really needs {{CalcJoinTransposeRule}}. {{Rank}} is a special form of {{Window}}, so {{ProjectWindowTransposeRule}} should supersede {{CalcJoinTransposeRule}}. (By saying "transposing rules", usually I would expect these rules are only generating more plan alternatives, cost-based planner chooses which is is better via cost. That's why you can see many counter pairs of rules like {{AggregateFilterTransposeRule}} and {{FilterAggregateTransposeRule}} in Calcite) > Projection cannot be pushed down through rank operator. > --- > > Key: FLINK-34529 > URL: https://issues.apache.org/jira/browse/FLINK-34529 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.19.0 >Reporter: yisha zhou >Assignee: yisha zhou >Priority: Major > > When there is a rank/deduplicate operator, the projection based on output of > this operator cannot be pushed down to the input of it. > The following code can help reproducing the issue: > {code:java} > val util = streamTestUtil() > util.addTableSource[(String, Int, String)]("T1", 'a, 'b, 'c) > util.addTableSource[(String, Int, String)]("T2", 'd, 'e, 'f) > val sql = > """ > |SELECT a FROM ( > | SELECT a, f, > | ROW_NUMBER() OVER (PARTITION BY f ORDER BY c DESC) as rank_num > | FROM T1, T2 > | WHERE T1.a = T2.d > |) > |WHERE rank_num = 1 > """.stripMargin > util.verifyPlan(sql){code} > The plan is expected to be: > {code:java} > Calc(select=[a]) > +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], > select=[a, c, f]) >+- Exchange(distribution=[hash[f]]) > +- Calc(select=[a, c, f]) > +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, c, d, f], > leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[hash[a]]) > : +- Calc(select=[a, c]) > : +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > +- Exchange(distribution=[hash[d]]) >+- Calc(select=[d, f]) > +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) > {code} > Notice that the 'select' of Join operator is [a, c, d, f]. However the actual > plan is: > {code:java} > Calc(select=[a]) > +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], > select=[a, c, f]) >+- Exchange(distribution=[hash[f]]) > +- Calc(select=[a, c, f]) > +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, > e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[hash[a]]) > : +- LegacyTableSourceScan(table=[[default_catalog, >
[jira] [Commented] (FLINK-34529) Projection cannot be pushed down through rank operator.
[ https://issues.apache.org/jira/browse/FLINK-34529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822118#comment-17822118 ] Benchao Li commented on FLINK-34529: [~xuyangzhong][~nilerzhou] Thank you for the explanation, it helps. I would prefer to putting these transposing rules all in "LOGICAL" stage, since in this stage we are using cost-based planner. I'm wondering if it's really necessary to have some transposing rules (now only {{CalcJoinTransposeRule}}) in "LOGICAL_REWRITE" stage, could you check whether we still needs {{CalcJoinTransposeRule}} in "LOGICAL_REWRITE" after introducing {{ProjectWindowTransposeRule}} in "LOGICAL" stage? What's more, I'm even wondering that if we really needs {{CalcJoinTransposeRule}}. {{Rank}} is a special form of {{Window}}, so {{ProjectWindowTransposeRule}} should supersede {{CalcJoinTransposeRule}}. (By saying "transposing rules", usually I would expect these rules are only generating more plan alternatives, cost-based planner chooses which is is better via cost. That's why you can see many counter pairs of rules like {{AggregateFilterTransposeRule}} and {{FilterAggregateTransposeRule}} in Calcite) > Projection cannot be pushed down through rank operator. > --- > > Key: FLINK-34529 > URL: https://issues.apache.org/jira/browse/FLINK-34529 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.19.0 >Reporter: yisha zhou >Priority: Major > > When there is a rank/deduplicate operator, the projection based on output of > this operator cannot be pushed down to the input of it. > The following code can help reproducing the issue: > {code:java} > val util = streamTestUtil() > util.addTableSource[(String, Int, String)]("T1", 'a, 'b, 'c) > util.addTableSource[(String, Int, String)]("T2", 'd, 'e, 'f) > val sql = > """ > |SELECT a FROM ( > | SELECT a, f, > | ROW_NUMBER() OVER (PARTITION BY f ORDER BY c DESC) as rank_num > | FROM T1, T2 > | WHERE T1.a = T2.d > |) > |WHERE rank_num = 1 > """.stripMargin > util.verifyPlan(sql){code} > The plan is expected to be: > {code:java} > Calc(select=[a]) > +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], > select=[a, c, f]) >+- Exchange(distribution=[hash[f]]) > +- Calc(select=[a, c, f]) > +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, c, d, f], > leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[hash[a]]) > : +- Calc(select=[a, c]) > : +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > +- Exchange(distribution=[hash[d]]) >+- Calc(select=[d, f]) > +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) > {code} > Notice that the 'select' of Join operator is [a, c, d, f]. However the actual > plan is: > {code:java} > Calc(select=[a]) > +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], > select=[a, c, f]) >+- Exchange(distribution=[hash[f]]) > +- Calc(select=[a, c, f]) > +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, > e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[hash[a]]) > : +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > +- Exchange(distribution=[hash[d]]) >+- LegacyTableSourceScan(table=[[default_catalog, > default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) > {code} > the 'select' of Join operator is [a, b, c, d, e, f], which means the > projection in the final Calc is not passed through the Rank. > And I think an easy way to fix this issue is to add > org.apache.calcite.rel.rules.ProjectWindowTransposeRule into > FlinkStreamRuleSets.LOGICAL_OPT_RULES. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34529) Projection cannot be pushed down through rank operator.
[ https://issues.apache.org/jira/browse/FLINK-34529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822119#comment-17822119 ] Benchao Li commented on FLINK-34529: [~nilerzhou] Assigned to you~ > Projection cannot be pushed down through rank operator. > --- > > Key: FLINK-34529 > URL: https://issues.apache.org/jira/browse/FLINK-34529 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.19.0 >Reporter: yisha zhou >Assignee: yisha zhou >Priority: Major > > When there is a rank/deduplicate operator, the projection based on output of > this operator cannot be pushed down to the input of it. > The following code can help reproducing the issue: > {code:java} > val util = streamTestUtil() > util.addTableSource[(String, Int, String)]("T1", 'a, 'b, 'c) > util.addTableSource[(String, Int, String)]("T2", 'd, 'e, 'f) > val sql = > """ > |SELECT a FROM ( > | SELECT a, f, > | ROW_NUMBER() OVER (PARTITION BY f ORDER BY c DESC) as rank_num > | FROM T1, T2 > | WHERE T1.a = T2.d > |) > |WHERE rank_num = 1 > """.stripMargin > util.verifyPlan(sql){code} > The plan is expected to be: > {code:java} > Calc(select=[a]) > +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], > select=[a, c, f]) >+- Exchange(distribution=[hash[f]]) > +- Calc(select=[a, c, f]) > +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, c, d, f], > leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[hash[a]]) > : +- Calc(select=[a, c]) > : +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > +- Exchange(distribution=[hash[d]]) >+- Calc(select=[d, f]) > +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) > {code} > Notice that the 'select' of Join operator is [a, c, d, f]. However the actual > plan is: > {code:java} > Calc(select=[a]) > +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], > select=[a, c, f]) >+- Exchange(distribution=[hash[f]]) > +- Calc(select=[a, c, f]) > +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, > e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[hash[a]]) > : +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > +- Exchange(distribution=[hash[d]]) >+- LegacyTableSourceScan(table=[[default_catalog, > default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) > {code} > the 'select' of Join operator is [a, b, c, d, e, f], which means the > projection in the final Calc is not passed through the Rank. > And I think an easy way to fix this issue is to add > org.apache.calcite.rel.rules.ProjectWindowTransposeRule into > FlinkStreamRuleSets.LOGICAL_OPT_RULES. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-34529) Projection cannot be pushed down through rank operator.
[ https://issues.apache.org/jira/browse/FLINK-34529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Benchao Li reassigned FLINK-34529: -- Assignee: yisha zhou > Projection cannot be pushed down through rank operator. > --- > > Key: FLINK-34529 > URL: https://issues.apache.org/jira/browse/FLINK-34529 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.19.0 >Reporter: yisha zhou >Assignee: yisha zhou >Priority: Major > > When there is a rank/deduplicate operator, the projection based on output of > this operator cannot be pushed down to the input of it. > The following code can help reproducing the issue: > {code:java} > val util = streamTestUtil() > util.addTableSource[(String, Int, String)]("T1", 'a, 'b, 'c) > util.addTableSource[(String, Int, String)]("T2", 'd, 'e, 'f) > val sql = > """ > |SELECT a FROM ( > | SELECT a, f, > | ROW_NUMBER() OVER (PARTITION BY f ORDER BY c DESC) as rank_num > | FROM T1, T2 > | WHERE T1.a = T2.d > |) > |WHERE rank_num = 1 > """.stripMargin > util.verifyPlan(sql){code} > The plan is expected to be: > {code:java} > Calc(select=[a]) > +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], > select=[a, c, f]) >+- Exchange(distribution=[hash[f]]) > +- Calc(select=[a, c, f]) > +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, c, d, f], > leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[hash[a]]) > : +- Calc(select=[a, c]) > : +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > +- Exchange(distribution=[hash[d]]) >+- Calc(select=[d, f]) > +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) > {code} > Notice that the 'select' of Join operator is [a, c, d, f]. However the actual > plan is: > {code:java} > Calc(select=[a]) > +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], > select=[a, c, f]) >+- Exchange(distribution=[hash[f]]) > +- Calc(select=[a, c, f]) > +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, > e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[hash[a]]) > : +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > +- Exchange(distribution=[hash[d]]) >+- LegacyTableSourceScan(table=[[default_catalog, > default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) > {code} > the 'select' of Join operator is [a, b, c, d, e, f], which means the > projection in the final Calc is not passed through the Rank. > And I think an easy way to fix this issue is to add > org.apache.calcite.rel.rules.ProjectWindowTransposeRule into > FlinkStreamRuleSets.LOGICAL_OPT_RULES. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34546] Emit span with failure labels on failure. [flink]
pnowojski commented on code in PR #24414: URL: https://github.com/apache/flink/pull/24414#discussion_r1507543874 ## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java: ## @@ -141,6 +155,51 @@ private CompletableFuture> labelFailure(Throwable cause, boo return FailureEnricherUtils.labelFailure(cause, ctx, mainThreadExecutor, failureEnrichers); } +private FailureHandlingResult handleFailureAndReport( +@Nullable final Execution failedExecution, +final Throwable cause, +long timestamp, +final Set verticesToRestart, +final boolean globalFailure) { + +FailureHandlingResult failureHandlingResult = +handleFailure(failedExecution, cause, timestamp, verticesToRestart, globalFailure); + +if (reportEventsAsSpans) { +// TODO: replace with reporting as event once events are supported. +// Add reporting as callback for when the failure labeling is completed. +failureHandlingResult +.getFailureLabels() +.thenAcceptAsync( +labels -> reportFailureHandling(failureHandlingResult, labels), +mainThreadExecutor); +} + +return failureHandlingResult; +} + +private void reportFailureHandling( +FailureHandlingResult failureHandlingResult, Map failureLabels) { + +// Add base attributes +SpanBuilder spanBuilder = +Span.builder(ExecutionFailureHandler.class, "JobFailure") +.setStartTsMillis(failureHandlingResult.getTimestamp()) +.setEndTsMillis(failureHandlingResult.getTimestamp()) +.setAttribute( +"canRestart", String.valueOf(failureHandlingResult.canRestart())) +.setAttribute( +"isGlobalFailure", + String.valueOf(failureHandlingResult.isGlobalFailure())); Review Comment: I see. I think it would be still better to document it but I don't have strong preferences. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34546] Emit span with failure labels on failure. [flink]
StefanRRichter commented on code in PR #24414: URL: https://github.com/apache/flink/pull/24414#discussion_r1507540785 ## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java: ## @@ -141,6 +155,51 @@ private CompletableFuture> labelFailure(Throwable cause, boo return FailureEnricherUtils.labelFailure(cause, ctx, mainThreadExecutor, failureEnrichers); } +private FailureHandlingResult handleFailureAndReport( +@Nullable final Execution failedExecution, +final Throwable cause, +long timestamp, +final Set verticesToRestart, +final boolean globalFailure) { + +FailureHandlingResult failureHandlingResult = +handleFailure(failedExecution, cause, timestamp, verticesToRestart, globalFailure); + +if (reportEventsAsSpans) { +// TODO: replace with reporting as event once events are supported. +// Add reporting as callback for when the failure labeling is completed. +failureHandlingResult +.getFailureLabels() +.thenAcceptAsync( +labels -> reportFailureHandling(failureHandlingResult, labels), +mainThreadExecutor); +} + +return failureHandlingResult; +} + +private void reportFailureHandling( +FailureHandlingResult failureHandlingResult, Map failureLabels) { + +// Add base attributes +SpanBuilder spanBuilder = +Span.builder(ExecutionFailureHandler.class, "JobFailure") +.setStartTsMillis(failureHandlingResult.getTimestamp()) +.setEndTsMillis(failureHandlingResult.getTimestamp()) +.setAttribute( +"canRestart", String.valueOf(failureHandlingResult.canRestart())) +.setAttribute( +"isGlobalFailure", + String.valueOf(failureHandlingResult.isGlobalFailure())); Review Comment: That's what I had before, but since it's not enabled by default right now I'd suggest we document this once it is reported as event by default. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34546] Emit span with failure labels on failure. [flink]
pnowojski commented on code in PR #24414: URL: https://github.com/apache/flink/pull/24414#discussion_r1507538335 ## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java: ## @@ -141,6 +155,51 @@ private CompletableFuture> labelFailure(Throwable cause, boo return FailureEnricherUtils.labelFailure(cause, ctx, mainThreadExecutor, failureEnrichers); } +private FailureHandlingResult handleFailureAndReport( +@Nullable final Execution failedExecution, +final Throwable cause, +long timestamp, +final Set verticesToRestart, +final boolean globalFailure) { + +FailureHandlingResult failureHandlingResult = +handleFailure(failedExecution, cause, timestamp, verticesToRestart, globalFailure); + +if (reportEventsAsSpans) { +// TODO: replace with reporting as event once events are supported. +// Add reporting as callback for when the failure labeling is completed. +failureHandlingResult +.getFailureLabels() +.thenAcceptAsync( +labels -> reportFailureHandling(failureHandlingResult, labels), +mainThreadExecutor); +} + +return failureHandlingResult; +} + +private void reportFailureHandling( +FailureHandlingResult failureHandlingResult, Map failureLabels) { + +// Add base attributes +SpanBuilder spanBuilder = +Span.builder(ExecutionFailureHandler.class, "JobFailure") +.setStartTsMillis(failureHandlingResult.getTimestamp()) +.setEndTsMillis(failureHandlingResult.getTimestamp()) +.setAttribute( +"canRestart", String.valueOf(failureHandlingResult.canRestart())) +.setAttribute( +"isGlobalFailure", + String.valueOf(failureHandlingResult.isGlobalFailure())); Review Comment: Can you add this span to the documentation of reported spans? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34546] Emit span with failure labels on failure. [flink]
StefanRRichter commented on code in PR #24414: URL: https://github.com/apache/flink/pull/24414#discussion_r1507536006 ## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java: ## @@ -141,6 +150,48 @@ private CompletableFuture> labelFailure(Throwable cause, boo return FailureEnricherUtils.labelFailure(cause, ctx, mainThreadExecutor, failureEnrichers); } +private FailureHandlingResult handleFailureAndReport( +@Nullable final Execution failedExecution, +final Throwable cause, +long timestamp, +final Set verticesToRestart, +final boolean globalFailure) { + +FailureHandlingResult failureHandlingResult = +handleFailure(failedExecution, cause, timestamp, verticesToRestart, globalFailure); + +// Add reporting as callback for when the failure labeling is completed. +failureHandlingResult +.getFailureLabels() +.thenAcceptAsync( +labels -> reportFailureHandling(failureHandlingResult, labels), +mainThreadExecutor); + +return failureHandlingResult; +} + +private void reportFailureHandling( +FailureHandlingResult failureHandlingResult, Map failureLabels) { + +// Add base attributes +SpanBuilder spanBuilder = +Span.builder(ExecutionFailureHandler.class, "HandleFailure") +.setStartTsMillis(failureHandlingResult.getTimestamp()) +.setEndTsMillis(System.currentTimeMillis()) +.setAttribute( +"canRestart", String.valueOf(failureHandlingResult.canRestart())) +.setAttribute( +"isGlobalFailure", + String.valueOf(failureHandlingResult.isGlobalFailure())); + +// Add all failure labels +for (Map.Entry entry : failureLabels.entrySet()) { +spanBuilder.setAttribute( +FAILURE_LABEL_ATTRIBUTE_PREFIX + entry.getKey(), entry.getValue()); +} +metricGroup.addSpan(spanBuilder); Review Comment: Yes, that's what I did :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34546] Emit span with failure labels on failure. [flink]
pnowojski commented on code in PR #24414: URL: https://github.com/apache/flink/pull/24414#discussion_r1507534949 ## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java: ## @@ -141,6 +150,48 @@ private CompletableFuture> labelFailure(Throwable cause, boo return FailureEnricherUtils.labelFailure(cause, ctx, mainThreadExecutor, failureEnrichers); } +private FailureHandlingResult handleFailureAndReport( +@Nullable final Execution failedExecution, +final Throwable cause, +long timestamp, +final Set verticesToRestart, +final boolean globalFailure) { + +FailureHandlingResult failureHandlingResult = +handleFailure(failedExecution, cause, timestamp, verticesToRestart, globalFailure); + +// Add reporting as callback for when the failure labeling is completed. +failureHandlingResult +.getFailureLabels() +.thenAcceptAsync( +labels -> reportFailureHandling(failureHandlingResult, labels), +mainThreadExecutor); + +return failureHandlingResult; +} + +private void reportFailureHandling( +FailureHandlingResult failureHandlingResult, Map failureLabels) { + +// Add base attributes +SpanBuilder spanBuilder = +Span.builder(ExecutionFailureHandler.class, "HandleFailure") +.setStartTsMillis(failureHandlingResult.getTimestamp()) +.setEndTsMillis(System.currentTimeMillis()) +.setAttribute( +"canRestart", String.valueOf(failureHandlingResult.canRestart())) +.setAttribute( +"isGlobalFailure", + String.valueOf(failureHandlingResult.isGlobalFailure())); + +// Add all failure labels +for (Map.Entry entry : failureLabels.entrySet()) { +spanBuilder.setAttribute( +FAILURE_LABEL_ATTRIBUTE_PREFIX + entry.getKey(), entry.getValue()); +} +metricGroup.addSpan(spanBuilder); Review Comment: As discussed offline. Let's for the time being report failures via spans, but hidden behind a temporary feature toggle like "use spans instead events". This option should be disabled by default and marked as deprecated from the beginning. Once Flink will support events, we can remove that option after a release or so. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org