[jira] [Assigned] (FLINK-34958) Add support Flink 1.20-SNAPSHOT and bump flink-connector-parent to 1.1.0 for mongodb connector
[ https://issues.apache.org/jira/browse/FLINK-34958?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu reassigned FLINK-34958: -- Assignee: Zhongqiang Gong > Add support Flink 1.20-SNAPSHOT and bump flink-connector-parent to 1.1.0 for > mongodb connector > -- > > Key: FLINK-34958 > URL: https://issues.apache.org/jira/browse/FLINK-34958 > Project: Flink > Issue Type: Improvement > Components: Connectors / MongoDB >Reporter: Zhongqiang Gong >Assignee: Zhongqiang Gong >Priority: Minor > > Changes: > * Add support Flink 1.20-SNAPSHOT > * Bump flink-connector-parent to 1.1.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34958) Add support Flink 1.20-SNAPSHOT and bump flink-connector-parent to 1.1.0 for mongodb connector
[ https://issues.apache.org/jira/browse/FLINK-34958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831632#comment-17831632 ] Zhongqiang Gong commented on FLINK-34958: - [~Leonard] I'm willing to take this. > Add support Flink 1.20-SNAPSHOT and bump flink-connector-parent to 1.1.0 for > mongodb connector > -- > > Key: FLINK-34958 > URL: https://issues.apache.org/jira/browse/FLINK-34958 > Project: Flink > Issue Type: Improvement > Components: Connectors / MongoDB >Reporter: Zhongqiang Gong >Priority: Minor > > Changes: > * Add support Flink 1.20-SNAPSHOT > * Bump flink-connector-parent to 1.1.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34958) Add support Flink 1.20-SNAPSHOT and bump flink-connector-parent to 1.1.0 for mongodb connector
Zhongqiang Gong created FLINK-34958: --- Summary: Add support Flink 1.20-SNAPSHOT and bump flink-connector-parent to 1.1.0 for mongodb connector Key: FLINK-34958 URL: https://issues.apache.org/jira/browse/FLINK-34958 Project: Flink Issue Type: Improvement Components: Connectors / MongoDB Reporter: Zhongqiang Gong Changes: * Add support Flink 1.20-SNAPSHOT * Bump flink-connector-parent to 1.1.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34934] Translation Flink-Kubernetes-Operator document framework construction [flink-kubernetes-operator]
caicancai commented on PR #805: URL: https://github.com/apache/flink-kubernetes-operator/pull/805#issuecomment-2024447717 > Thanks @caicancai for the contribution and test! > > I run hugo locally, and try to choose English and Chinese doc, it works well. > > But I noticed #807 is merged, but this PR doesn't include it for Chinese doc. It's needed to sync them from English doc to Chinese doc. > > After that, this PR is fine for me. Thank you for the reminder, updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34906] Only scale when all tasks are running [flink-kubernetes-operator]
gyfora commented on PR #801: URL: https://github.com/apache/flink-kubernetes-operator/pull/801#issuecomment-2024422087 sounds good @1996fanrui -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34924][table] Support partition pushdown for join queries [flink]
libenchao commented on PR #24559: URL: https://github.com/apache/flink/pull/24559#issuecomment-2024407235 It sounds like a good idea to do the optimization, I'll try to give it a review in the following days, thank you for the PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34656) Generated code for `ITEM` operator should return null when getting element of a null map/array/row
[ https://issues.apache.org/jira/browse/FLINK-34656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831623#comment-17831623 ] Benchao Li commented on FLINK-34656: [~nilerzhou] Thank you for the test case, it helps a lot, we indeed should fix it. The problem seems related to the type of {{ARRAY(BIGINT.notNull()).nullable()}}, although the inner type is {{NOT NULL}}, the outer type is {{NULLABLE}}, there is a similar discussion in FLINK-31830 (and a corresponding discussion thread : https://lists.apache.org/thread/fzrfc9c3rtgw761ofdydl0q96km558q7). Since this is a codegen issue which does not affect the user interface, we can solve this directly. > Generated code for `ITEM` operator should return null when getting element of > a null map/array/row > -- > > Key: FLINK-34656 > URL: https://issues.apache.org/jira/browse/FLINK-34656 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.20.0 >Reporter: yisha zhou >Priority: Major > > In FieldAccessFromTableITCase we can find that the expected result of f0[1] > is null when f0 is a null array. > However, behavior in generated code for ITEM is not consistent with case > above. The main code is: > > {code:java} > val arrayAccessCode = > s""" > |${array.code} > |${index.code} > |boolean $nullTerm = ${array.nullTerm} || ${index.nullTerm} || > | $idxStr < 0 || $idxStr >= ${array.resultTerm}.size() || $arrayIsNull; > |$resultTypeTerm $resultTerm = $nullTerm ? $defaultTerm : $arrayGet; > |""".stripMargin {code} > If `array.nullTerm` is true, a default value of element type will be > returned, for example -1 for null bigint array. > The reason why FieldAccessFromTableITCase can get expected result is that the > ReduceExpressionsRule generated an expression code for that case like: > {code:java} > boolean isNull$0 = true || false || > ((int) 1) - 1 < 0 || ((int) 1) - 1 >= > ((org.apache.flink.table.data.ArrayData) null).size() || > ((org.apache.flink.table.data.ArrayData) null).isNullAt(((int) 1) - 1); > long result$0 = isNull$0 ? -1L : ((org.apache.flink.table.data.ArrayData) > null).getLong(((int) 1) - 1); > if (isNull$0) { > out.setField(0, null); > } else { > out.setField(0, result$0); > } {code} > The reduced expr will be a null literal. > > I think the behaviors for getting element of a null value should be unified. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34947] Only scale down JM in Foreground deletion propagation and reduce timeout [flink-kubernetes-operator]
gyfora commented on code in PR #806: URL: https://github.com/apache/flink-kubernetes-operator/pull/806#discussion_r1542314747 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java: ## @@ -306,34 +310,42 @@ protected Map getVertexResources( } /** - * Scale JM deployment to zero to gracefully stop all JM instances before any TMs are stopped. - * This avoids race conditions between JM shutdown and TM shutdown / failure handling. + * Shut down JobManagers gracefully by scaling JM deployment to zero. This avoids race + * conditions between JM shutdown and TM shutdown / failure handling. * * @param jmDeployment * @param namespace * @param clusterId - * @param timeout + * @param remainingTimeout * @return Remaining timeout after the operation. */ -private Duration scaleJmToZeroBlocking( +private Duration shutdownJobManagers( Review Comment: I will change this, makes sense :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34529][table-planner] Introduce FlinkProjectWindowTransposeRule. [flink]
libenchao commented on code in PR #24567: URL: https://github.com/apache/flink/pull/24567#discussion_r1542265389 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala: ## @@ -186,6 +187,27 @@ class FlinkRelMdUpsertKeys private extends MetadataHandler[UpsertKeys] { } def getUpsertKeys(rel: Window, mq: RelMetadataQuery): JSet[ImmutableBitSet] = { +// If it's a ROW_NUMBER rank, then the upsert keys are partition by key and order key. Review Comment: the upsert keys should be : partition key + order key + rownumber column? ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala: ## @@ -186,6 +187,27 @@ class FlinkRelMdUpsertKeys private extends MetadataHandler[UpsertKeys] { } def getUpsertKeys(rel: Window, mq: RelMetadataQuery): JSet[ImmutableBitSet] = { Review Comment: And since you changed `FlinkRelMdUniqueKeys`, it would be good to also add tests for it in `FlinkRelMdUniqueKeysTest` ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/RankITCase.scala: ## @@ -656,18 +656,14 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode "(true,1,book,b,3,2)", "(true,2,fruit,a,2,1)", "(true,3,book,a,1,2)", - "(true,3,book,a,1,2)", Review Comment: Is this change expected? ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala: ## @@ -186,6 +187,27 @@ class FlinkRelMdUpsertKeys private extends MetadataHandler[UpsertKeys] { } def getUpsertKeys(rel: Window, mq: RelMetadataQuery): JSet[ImmutableBitSet] = { +// If it's a ROW_NUMBER rank, then the upsert keys are partition by key and order key. +if (rel.groups.length == 1) { Review Comment: `Window` can have multiple groups, if any one of them contains `row_number`, it could be an upsert key condidate ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala: ## @@ -186,6 +187,27 @@ class FlinkRelMdUpsertKeys private extends MetadataHandler[UpsertKeys] { } def getUpsertKeys(rel: Window, mq: RelMetadataQuery): JSet[ImmutableBitSet] = { Review Comment: It seems like a good improvement to both unique key and upsert key metadata, do you think it's possible to put it in `FlinkRelMdUniqueKeys` and reuse it in `FlinkRelMdUpsertKeys`, just like `FlinkRelMdUniqueKeys.getRankUniqueKeys`? ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkProjectWindowTransposeRule.java: ## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery; + +import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.Window; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rel.logical.LogicalWindow; +import org.apache.calcite.rel.rules.ProjectRemoveRule; +import org.apache.calcite.rel.rules.TransformationRule; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexShuttle; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.util.BitSets; +import org.apache.calcite.util.ImmutableBitSet; +import org.immutables.value.Value; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +/** + *
Re: [PR] [build] fix inconsistent Kafka shading among cdc connectors [flink-cdc]
link3280 commented on PR #2988: URL: https://github.com/apache/flink-cdc/pull/2988#issuecomment-2024362933 cc @PatrickRen -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-34957) JDBC Autoscaler event handler throws Column 'message' cannot be null
Rui Fan created FLINK-34957: --- Summary: JDBC Autoscaler event handler throws Column 'message' cannot be null Key: FLINK-34957 URL: https://issues.apache.org/jira/browse/FLINK-34957 Project: Flink Issue Type: Bug Components: Autoscaler Reporter: Rui Fan Assignee: Rui Fan Fix For: kubernetes-operator-1.9.0 Attachments: image-2024-03-28-11-57-35-234.png JDBC Autoscaler event handler doesn't allow the event message is null, but the message may be null when we handle the exception. We consider the exception message as the event message, but the exception message may be null, such as: TimeoutException. (It has been shown in following picture.) Also, ecording a event without any message is meaningless. It doesn't have any benefit for troubleshooting. Solution: * Consider the exception message as the event message when exception message isn't null * The whole Exception as the event message if exception message is null. !image-2024-03-28-11-57-35-234.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34956][doc] Fix the config type wrong of Duration [flink]
flinkbot commented on PR #24581: URL: https://github.com/apache/flink/pull/24581#issuecomment-2024335796 ## CI report: * 62b4ebc169137a9d72e73481d8bcae51b5c2cc8b UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [pipeline-connector][paimon] add paimon pipeline data sink connector. [flink-cdc]
yanghuaiGit commented on PR #2916: URL: https://github.com/apache/flink-cdc/pull/2916#issuecomment-2024335047 paimon latest version is 0.7,we should update paimon version from 0.6 to 0.7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34956) The config type is wrong for Duration
[ https://issues.apache.org/jira/browse/FLINK-34956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34956: --- Labels: pull-request-available (was: ) > The config type is wrong for Duration > - > > Key: FLINK-34956 > URL: https://issues.apache.org/jira/browse/FLINK-34956 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > Attachments: image-2024-03-28-11-21-31-802.png > > > The Config type is Boolean, but it should be Duration. > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/ > !image-2024-03-28-11-21-31-802.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-34956][doc] Fix the config type wrong of Duration [flink]
1996fanrui opened a new pull request, #24581: URL: https://github.com/apache/flink/pull/24581 ## What is the purpose of the change The Config type is Boolean, but it should be Duration. https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/ ![image](https://github.com/apache/flink/assets/38427477/f1156c3e-e3e2-4d36-88a0-ed2976a598f6) ## Brief change log - [FLINK-34956][doc] Fix the config type wrong of Duration -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][cdc-connector][mysql] Fix NoClassDefFoundError when create new table in mysql cdc source [flink-cdc]
meicao2999 commented on PR #3036: URL: https://github.com/apache/flink-cdc/pull/3036#issuecomment-2024328996 I have also encountered Does NoClassDefFoundError. Is there a specific class -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-34956) The config type is wrong for Duration
Rui Fan created FLINK-34956: --- Summary: The config type is wrong for Duration Key: FLINK-34956 URL: https://issues.apache.org/jira/browse/FLINK-34956 Project: Flink Issue Type: Bug Components: Documentation Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.20.0 Attachments: image-2024-03-28-11-21-31-802.png The Config type is Boolean, but it should be Duration. https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/ !image-2024-03-28-11-21-31-802.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [cdc-cli]Add support for both new and old Flink config files in Flink… [flink-cdc]
skymilong commented on PR #3194: URL: https://github.com/apache/flink-cdc/pull/3194#issuecomment-2024327508 > @skymilong Thanks for the PR! According to the code contribution rule of Apache Flink, could you create an issue on [Jira](https://issues.apache.org/jira) for this and include the Jira ID in the PR title and commit message? You can take #3160 as an example. > > And the PR has code formatting issue. Please run `mvn spotless:apply` before pushing your commits. > > Thanks! Thanks a bunch for your help. This is my first rodeo contributing code on GitHub, so I'm still trying to get the hang of things. I'll whip up a Jira issue like you suggested and pop the ID into the PR and commit messages. Also, I'll iron out those code formatting kinks. I'll get on it pronto and get the PR updated. Appreciate your patience and all your help. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33985][runtime] Support obtain all partitions existing in cluster through ShuffleMaster. [flink]
zhuzhurk commented on code in PR #24553: URL: https://github.com/apache/flink/pull/24553#discussion_r1542254332 ## flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java: ## @@ -62,7 +61,7 @@ public class NettyShuffleMaster implements ShuffleMaster @Nullable private final TieredInternalShuffleMaster tieredInternalShuffleMaster; -private final Map jobMasters = new HashMap<>(); +private final Map jobMasterShuffleMasters = new HashMap<>(); Review Comment: I prefer to name it as `jobShuffleContexts` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [build] fix inconsistent Kafka shading among cdc connectors [flink-cdc]
link3280 commented on code in PR #2988: URL: https://github.com/apache/flink-cdc/pull/2988#discussion_r1542247969 ## flink-cdc-connect/flink-cdc-source-connectors/flink-sql-connector-tidb-cdc/pom.xml: ## @@ -60,6 +60,12 @@ under the License. + +org.apache.kafka + + com.ververica.cdc.connectors.shaded.org.apache.kafka Review Comment: The shaded pattern is updated. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34906] Only scale when all tasks are running [flink-kubernetes-operator]
1996fanrui commented on PR #801: URL: https://github.com/apache/flink-kubernetes-operator/pull/801#issuecomment-2024295706 > Thanks Rui! The changes make sense to me. To Gyulas point, I think we should try to deduplicate the logic such that both Kubernetes autoscaler and standalone use the same code path. I could move `JobStatusUtils` from `flink-autoscaler-standalone` module to `flink-autoscaler` module, then both of `flink-autoscaler-standalone` and `flink-kubernetes-operator` module can use it. Hey @gyfora , what do you think about it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [build] fix jackson conflicts among cdc connectors [flink-cdc]
link3280 commented on PR #2987: URL: https://github.com/apache/flink-cdc/pull/2987#issuecomment-2024287943 ping @leonardBang @PatrickRen -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [docs]Change host to host name in configuration for readme [flink-cdc]
cjj2010 commented on PR #3076: URL: https://github.com/apache/flink-cdc/pull/3076#issuecomment-2024282059 > Could you Complete rebase now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] (FLINK-34556) Migrate EnumerableToLogicalTableScan
[ https://issues.apache.org/jira/browse/FLINK-34556 ] Jacky Lau deleted comment on FLINK-34556: --- was (Author: jackylau): hi [~snuyanzin] will you also help review this pr ? > Migrate EnumerableToLogicalTableScan > > > Key: FLINK-34556 > URL: https://issues.apache.org/jira/browse/FLINK-34556 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Affects Versions: 1.20.0 >Reporter: Jacky Lau >Assignee: Jacky Lau >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]
liuyongvs commented on PR #24526: URL: https://github.com/apache/flink/pull/24526#issuecomment-2024262258 hi @MartijnVisser INTERSECT is different with array_intersect. instersect is a set RelNode, like union/union all, which indeed is a SQL standard.. while array_intersect array_union are just functions, which doesn't have any standard -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34955] Upgrade commons-compress to 1.26.0. [flink]
flinkbot commented on PR #24580: URL: https://github.com/apache/flink/pull/24580#issuecomment-2024252852 ## CI report: * 9741745f8b3a6395e4ab4cc530d11d813addce9f UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34955) Upgrade commons-compress to 1.26.0
[ https://issues.apache.org/jira/browse/FLINK-34955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34955: --- Labels: pull-request-available (was: ) > Upgrade commons-compress to 1.26.0 > -- > > Key: FLINK-34955 > URL: https://issues.apache.org/jira/browse/FLINK-34955 > Project: Flink > Issue Type: Improvement >Reporter: Shilun Fan >Priority: Major > Labels: pull-request-available > > commons-compress 1.24.0 has CVE issues, try to upgrade to 1.26.0, we can > refer to the maven link > https://mvnrepository.com/artifact/org.apache.commons/commons-compress -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-34955] Upgrade commons-compress to 1.26.0. [flink]
slfan1989 opened a new pull request, #24580: URL: https://github.com/apache/flink/pull/24580 ## What is the purpose of the change JIRA: [FLINK-34955] Upgrade commons-compress to 1.26.0. commons-compress 1.24.0 has CVE issues, try to upgrade to 1.26.0, we can refer to the maven link https://mvnrepository.com/artifact/org.apache.commons/commons-compress ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-34955) Upgrade commons-compress to 1.26.0
Shilun Fan created FLINK-34955: -- Summary: Upgrade commons-compress to 1.26.0 Key: FLINK-34955 URL: https://issues.apache.org/jira/browse/FLINK-34955 Project: Flink Issue Type: Improvement Reporter: Shilun Fan commons-compress 1.24.0 has CVE issues, try to upgrade to 1.26.0, we can refer to the maven link https://mvnrepository.com/artifact/org.apache.commons/commons-compress -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-34943) Support Flink 1.19, 1.20-SNAPSHOT for JDBC connector
[ https://issues.apache.org/jira/browse/FLINK-34943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhongqiang Gong closed FLINK-34943. --- Resolution: Fixed Fixed by https://github.com/apache/flink-connector-jdbc/pull/107 > Support Flink 1.19, 1.20-SNAPSHOT for JDBC connector > > > Key: FLINK-34943 > URL: https://issues.apache.org/jira/browse/FLINK-34943 > Project: Flink > Issue Type: Improvement > Components: Connectors / JDBC >Reporter: Zhongqiang Gong >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] FLINK-31361 - Added the right shaded dependency for SQL client connector for kafka - Updated kafka.md [flink-connector-kafka]
alpinegizmo commented on code in PR #88: URL: https://github.com/apache/flink-connector-kafka/pull/88#discussion_r1542159853 ## docs/content/docs/connectors/table/kafka.md: ## @@ -674,7 +674,7 @@ Please note that the class path of the login module in `sasl.jaas.config` might client dependencies, so you may need to rewrite it with the actual class path of the module in the JAR. SQL client JAR has relocated Kafka client dependencies to `org.apache.flink.kafka.shaded.org.apache.kafka`, then the path of plain login module in code snippets above need to be -`org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule` when using SQL client JAR. +`org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule` when using SQL client JAR(flink-sql-connector-kafka-x.xx.x.jar). Review Comment: ```suggestion `org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule` when using SQL client JAR (flink-sql-connector-kafka-x.xx.x.jar). ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Bump org.elasticsearch:elasticsearch from 7.10.2 to 7.17.19 in /flink-connector-elasticsearch-base [flink-connector-elasticsearch]
dependabot[bot] opened a new pull request, #95: URL: https://github.com/apache/flink-connector-elasticsearch/pull/95 [![Dependabot compatibility score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=org.elasticsearch:elasticsearch=maven=7.10.2=7.17.19)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores) Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting `@dependabot rebase`. [//]: # (dependabot-automerge-start) [//]: # (dependabot-automerge-end) --- Dependabot commands and options You can trigger Dependabot actions by commenting on this PR: - `@dependabot rebase` will rebase this PR - `@dependabot recreate` will recreate this PR, overwriting any edits that have been made to it - `@dependabot merge` will merge this PR after your CI passes on it - `@dependabot squash and merge` will squash and merge this PR after your CI passes on it - `@dependabot cancel merge` will cancel a previously requested merge and block automerging - `@dependabot reopen` will reopen this PR if it is closed - `@dependabot close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually - `@dependabot show ignore conditions` will show all of the ignore conditions of the specified dependency - `@dependabot ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself) - `@dependabot ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself) - `@dependabot ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself) You can disable automated security fix PRs for this repo from the [Security Alerts page](https://github.com/apache/flink-connector-elasticsearch/network/alerts). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31497][table] Drop the deprecated CatalogViewImpl [flink]
flinkbot commented on PR #24579: URL: https://github.com/apache/flink/pull/24579#issuecomment-2023946346 ## CI report: * e5666b868012f919d639486030aa0bc5a5f25142 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31497) Drop the deprecated CatalogViewImpl
[ https://issues.apache.org/jira/browse/FLINK-31497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31497: --- Labels: pull-request-available (was: ) > Drop the deprecated CatalogViewImpl > > > Key: FLINK-31497 > URL: https://issues.apache.org/jira/browse/FLINK-31497 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: WenJun Min >Priority: Major > Labels: pull-request-available > > After https://issues.apache.org/jira/browse/FLINK-29585 > CatalogViewImpl not used in Flink project now, we may can drop it now cc > [~snuyanzin] > But, we may have to check whether it is used in other connector's system -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-31497][table] Drop the deprecated CatalogViewImpl [flink]
jeyhunkarimov opened a new pull request, #24579: URL: https://github.com/apache/flink/pull/24579 ## What is the purpose of the change Drop the deprecated CatalogViewImpl ## Brief change log - Refactor documentation - Remove CatalogViewImpl - Refactor python code that uses CatalogViewImpl ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33676] Implement RestoreTests for WindowAggregate [flink]
jnh5y commented on PR #23886: URL: https://github.com/apache/flink/pull/23886#issuecomment-2023874164 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33805] Implement restore tests for OverAggregate node [flink]
jnh5y commented on code in PR #24565: URL: https://github.com/apache/flink/pull/24565#discussion_r1541935341 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateTestPrograms.java: ## @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions; +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; + +import static org.apache.flink.table.api.config.TableConfigOptions.LOCAL_TIME_ZONE; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecOverAggregate}. */ +public class OverAggregateTestPrograms { + +static final Row[] DATA = { Review Comment: Yes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34538][docs] Add Autotuning documentation [flink-kubernetes-operator]
mxm merged PR #807: URL: https://github.com/apache/flink-kubernetes-operator/pull/807 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34538][docs] Add Autotuning documentation [flink-kubernetes-operator]
mxm commented on PR #807: URL: https://github.com/apache/flink-kubernetes-operator/pull/807#issuecomment-2023796047 Thanks for the quick review. @1996fanrui Let me know if you have any comments. I'll address them post-merge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33676] Implement RestoreTests for WindowAggregate [flink]
jnh5y commented on code in PR #23886: URL: https://github.com/apache/flink/pull/23886#discussion_r1541903259 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest.java: ## @@ -1,528 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.plan.nodes.exec.stream; - -import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.config.OptimizerConfigOptions; -import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.ConcatDistinctAggFunction; -import org.apache.flink.table.planner.utils.StreamTableTestUtil; -import org.apache.flink.table.planner.utils.TableTestBase; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -/** Test json serialization/deserialization for window aggregate. */ -class WindowAggregateJsonPlanTest extends TableTestBase { - -private StreamTableTestUtil util; -private TableEnvironment tEnv; - -@BeforeEach -void setup() { -util = streamTestUtil(TableConfig.getDefault()); -tEnv = util.getTableEnv(); - -String insertOnlyTableDdl = -"CREATE TABLE MyTable (\n" -+ " a INT,\n" -+ " b BIGINT,\n" -+ " c VARCHAR,\n" -+ " `rowtime` AS TO_TIMESTAMP(c),\n" -+ " proctime as PROCTIME(),\n" -+ " WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n" -+ ") WITH (\n" -+ " 'connector' = 'values')\n"; -tEnv.executeSql(insertOnlyTableDdl); - -String changelogTableDdl = -"CREATE TABLE MyCDCTable (\n" -+ " a INT,\n" -+ " b BIGINT,\n" -+ " c VARCHAR,\n" -+ " `rowtime` AS TO_TIMESTAMP(c),\n" -+ " proctime as PROCTIME(),\n" -+ " WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n" -+ ") WITH (\n" -+ " 'connector' = 'values',\n" -+ " 'changelog-mode' = 'I,UA,UB,D')\n"; -tEnv.executeSql(changelogTableDdl); -} - -@Test -void testEventTimeTumbleWindow() { -tEnv.createFunction("concat_distinct_agg", ConcatDistinctAggFunction.class); -String sinkTableDdl = -"CREATE TABLE MySink (\n" -+ " b BIGINT,\n" -+ " window_start TIMESTAMP(3),\n" -+ " window_end TIMESTAMP(3),\n" -+ " cnt BIGINT,\n" -+ " sum_a INT,\n" -+ " distinct_cnt BIGINT,\n" -+ " concat_distinct STRING\n" -+ ") WITH (\n" -+ " 'connector' = 'values')\n"; -tEnv.executeSql(sinkTableDdl); -util.verifyJsonPlan( -"insert into MySink select\n" -+ " b,\n" -+ " window_start,\n" -+ " window_end,\n" -+ " COUNT(*),\n" -+ " SUM(a),\n" -+ " COUNT(DISTINCT c),\n" -+ " concat_distinct_agg(c)\n" -+ "FROM TABLE(\n" -+ " TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n" -+ "GROUP BY b, window_start, window_end"); -} - -@Test -void testEventTimeTumbleWindowWithCDCSource() { -tEnv.createFunction("concat_distinct_agg", ConcatDistinctAggFunction.class); -String sinkTableDdl = -"CREATE TABLE MySink (\n" -+ " b BIGINT,\n" -+ " window_start TIMESTAMP(3),\n" -+ " window_end TIMESTAMP(3),\n" -+ " cnt BIGINT,\n" -+ " sum_a INT,\n" -
[jira] [Updated] (FLINK-34954) Kryo input implementation NoFetchingInput fails to handle zero length bytes
[ https://issues.apache.org/jira/browse/FLINK-34954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Qinghui Xu updated FLINK-34954: --- Description: If the serailized bytes are empty, `NoFetchingInput` will run into error when Kryo tries to deserialize it. Example: a protobuf 3 object that contains only default values will be serialized as 0 length byte array, and the deserialization later will fail. Illustration: {noformat} import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.io.{ByteBufferInput, ByteBufferOutput, Input, Output} import com.google.protobuf.{DescriptorProtos, Message}import com.twitter.chill.protobuf.ProtobufSerializer import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput import java.io.ByteArrayInputStream object ProtoSerializationTest { def main(args: Array[String]) = { val chillProtoSerializer = new ProtobufSerializer val protomessage = DescriptorProtos.DescriptorProto.getDefaultInstance val output: Output = new ByteBufferOutput(1000) chillProtoSerializer.write(null, output, protomessage) val serialized: Array[Byte] = output.toBytes println(s"Serialized : $serialized") val input: Input = new NoFetchingInput(new ByteArrayInputStream(serialized)) val deserialized = chillProtoSerializer.read(null, input, classOf[BillableClick].asInstanceOf[Class[Message]]) println(deserialized) } } {noformat} Error {noformat} Exception in thread "main" java.lang.RuntimeException: Could not create class com.criteo.glup.BillableClickProto$BillableClick at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76) at com.criteo.streaming.common.bootstrap.ProtoSerialization$.main(ProtoSerialization.scala:22) at ProtoSerialization.main(ProtoSerialization.scala) Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No more bytes left. at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128) at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:332) at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73) ... 2 more Caused by: java.io.EOFException: No more bytes left. ... 5 more{noformat} was: If the serailized bytes are empty, `NoFetchingInput` will run into error when Kryo tries to deserialize it. Example: a protobuf 3 object that contains only default values will be serialized as 0 length byte array, and the deserialization later will fail. Illustration: ``` import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.io.\{ByteBufferInput, ByteBufferOutput, Input, Output} import com.google.protobuf.\{DescriptorProtos, Message}import com.twitter.chill.protobuf.ProtobufSerializer import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput import java.io.ByteArrayInputStream object ProtoSerializationTest { def main(args: Array[String]) = { val chillProtoSerializer = new ProtobufSerializer val protomessage = DescriptorProtos.DescriptorProto.getDefaultInstance val output: Output = new ByteBufferOutput(1000) chillProtoSerializer.write(null, output, protomessage) val serialized: Array[Byte] = output.toBytes println(s"Serialized : $serialized") val input: Input = new NoFetchingInput(new ByteArrayInputStream(serialized)) val deserialized = chillProtoSerializer.read(null, input, classOf[BillableClick].asInstanceOf[Class[Message]]) println(deserialized) } } ``` Error ``` Exception in thread "main" java.lang.RuntimeException: Could not create class com.criteo.glup.BillableClickProto$BillableClick at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76) at com.criteo.streaming.common.bootstrap.ProtoSerialization$.main(ProtoSerialization.scala:22) at ProtoSerialization.main(ProtoSerialization.scala) Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No more bytes left. at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128) at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:332) at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73) ... 2 more Caused by: java.io.EOFException: No more bytes left. ... 5 more ``` > Kryo input implementation NoFetchingInput fails to handle zero length bytes > --- > > Key: FLINK-34954 > URL: https://issues.apache.org/jira/browse/FLINK-34954 > Project: Flink > Issue Type: Bug >Reporter: Qinghui Xu >Priority: Major > > If the serailized bytes are empty, `NoFetchingInput` will run into error when > Kryo tries to deserialize it. > Example: a protobuf 3 object that contains only default values will be > serialized as 0
Re: [PR] [FLINK-33676] Implement RestoreTests for WindowAggregate [flink]
jnh5y commented on code in PR #23886: URL: https://github.com/apache/flink/pull/23886#discussion_r1541901567 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateTestPrograms.java: ## @@ -0,0 +1,528 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.planner.utils.AggregatePhaseStrategy; +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; + +import java.math.BigDecimal; +import java.util.function.Function; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecWindowAggregate}. */ +public class WindowAggregateTestPrograms { + +static final Row[] BEFORE_DATA = { +Row.of("2020-10-10 00:00:01", 1, 1d, 1f, new BigDecimal("1.11"), "Hi", "a"), +Row.of("2020-10-10 00:00:02", 2, 2d, 2f, new BigDecimal("2.22"), "Comment#1", "a"), +Row.of("2020-10-10 00:00:03", 2, 2d, 2f, new BigDecimal("2.22"), "Comment#1", "a"), +Row.of("2020-10-10 00:00:04", 5, 5d, 5f, new BigDecimal("5.55"), null, "a"), +Row.of("2020-10-10 00:00:07", 3, 3d, 3f, null, "Hello", "b"), +// out of order +Row.of("2020-10-10 00:00:06", 6, 6d, 6f, new BigDecimal("6.66"), "Hi", "b"), +Row.of("2020-10-10 00:00:08", 3, null, 3f, new BigDecimal("3.33"), "Comment#2", "a"), +// late event +Row.of("2020-10-10 00:00:04", 5, 5d, null, new BigDecimal("5.55"), "Hi", "a"), +Row.of("2020-10-10 00:00:16", 4, 4d, 4f, new BigDecimal("4.44"), "Hi", "b"), +Row.of("2020-10-10 00:00:32", 7, 7d, 7f, new BigDecimal("7.77"), null, null), +Row.of("2020-10-10 00:00:34", 1, 3d, 3f, new BigDecimal("3.33"), "Comment#3", "b") +}; + +static final Row[] AFTER_DATA = { +Row.of("2020-10-10 00:00:40", 10, 3d, 3f, new BigDecimal("4.44"), "Comment#4", "a"), +Row.of("2020-10-10 00:00:42", 11, 4d, 4f, new BigDecimal("5.44"), "Comment#5", "d"), +Row.of("2020-10-10 00:00:43", 12, 5d, 5f, new BigDecimal("6.44"), "Comment#6", "c"), +Row.of("2020-10-10 00:00:44", 13, 6d, 6f, new BigDecimal("7.44"), "Comment#7", "d") +}; + +static final Function SOURCE_BUILDER = +str -> +SourceTestStep.newBuilder(str) +.addSchema( +"ts STRING", +"a_int INT", +"b_double DOUBLE", +"c_float FLOAT", +"d_bigdec DECIMAL(10, 2)", +"`comment` STRING", +"name STRING", +"`rowtime` AS TO_TIMESTAMP(`ts`)", +"`proctime` AS PROCTIME()", +"WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND") +.addOption("changelog-mode", "I,UA,UB,D") +.producedBeforeRestore(BEFORE_DATA) +.producedAfterRestore(AFTER_DATA); +static final SourceTestStep SOURCE = SOURCE_BUILDER.apply("window_source_t").build(); + +static final SourceTestStep CDC_SOURCE = +SOURCE_BUILDER +.apply("cdc_window_source_t") +.addOption("changelog-mode", "I,UA,UB,D") +.build(); + +static final String[] TUMBLE_EVENT_TIME_BEFORE_ROWS = { +"+I[a, 2020-10-10T00:00, 2020-10-10T00:00:05, 4, 10, 2]", +"+I[a, 2020-10-10T00:00:05, 2020-10-10T00:00:10, 1, 3, 1]", +"+I[b, 2020-10-10T00:00:05, 2020-10-10T00:00:10, 2, 9, 2]", +"+I[b, 2020-10-10T00:00:15, 2020-10-10T00:00:20, 1, 4, 1]" +}; + +public static final String[] TUMBLE_EVENT_TIME_AFTER_ROWS = { +"+I[b, 2020-10-10T00:00:30, 2020-10-10T00:00:35,
Re: [PR] [FLINK-33676] Implement RestoreTests for WindowAggregate [flink]
jnh5y commented on code in PR #23886: URL: https://github.com/apache/flink/pull/23886#discussion_r1541901325 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateTestPrograms.java: ## @@ -0,0 +1,528 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.planner.utils.AggregatePhaseStrategy; +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; + +import java.math.BigDecimal; +import java.util.function.Function; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecWindowAggregate}. */ +public class WindowAggregateTestPrograms { + +static final Row[] BEFORE_DATA = { Review Comment: Yes. Doing it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33676] Implement RestoreTests for WindowAggregate [flink]
jnh5y commented on code in PR #23886: URL: https://github.com/apache/flink/pull/23886#discussion_r1541900291 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateTestPrograms.java: ## @@ -0,0 +1,528 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.planner.utils.AggregatePhaseStrategy; +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; + +import java.math.BigDecimal; +import java.util.function.Function; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecWindowAggregate}. */ +public class WindowAggregateTestPrograms { + +static final Row[] BEFORE_DATA = { +Row.of("2020-10-10 00:00:01", 1, 1d, 1f, new BigDecimal("1.11"), "Hi", "a"), +Row.of("2020-10-10 00:00:02", 2, 2d, 2f, new BigDecimal("2.22"), "Comment#1", "a"), +Row.of("2020-10-10 00:00:03", 2, 2d, 2f, new BigDecimal("2.22"), "Comment#1", "a"), +Row.of("2020-10-10 00:00:04", 5, 5d, 5f, new BigDecimal("5.55"), null, "a"), +Row.of("2020-10-10 00:00:07", 3, 3d, 3f, null, "Hello", "b"), +// out of order +Row.of("2020-10-10 00:00:06", 6, 6d, 6f, new BigDecimal("6.66"), "Hi", "b"), +Row.of("2020-10-10 00:00:08", 3, null, 3f, new BigDecimal("3.33"), "Comment#2", "a"), +// late event +Row.of("2020-10-10 00:00:04", 5, 5d, null, new BigDecimal("5.55"), "Hi", "a"), +Row.of("2020-10-10 00:00:16", 4, 4d, 4f, new BigDecimal("4.44"), "Hi", "b"), +Row.of("2020-10-10 00:00:32", 7, 7d, 7f, new BigDecimal("7.77"), null, null), +Row.of("2020-10-10 00:00:34", 1, 3d, 3f, new BigDecimal("3.33"), "Comment#3", "b") +}; + +static final Row[] AFTER_DATA = { +Row.of("2020-10-10 00:00:40", 10, 3d, 3f, new BigDecimal("4.44"), "Comment#4", "a"), +Row.of("2020-10-10 00:00:42", 11, 4d, 4f, new BigDecimal("5.44"), "Comment#5", "d"), +Row.of("2020-10-10 00:00:43", 12, 5d, 5f, new BigDecimal("6.44"), "Comment#6", "c"), +Row.of("2020-10-10 00:00:44", 13, 6d, 6f, new BigDecimal("7.44"), "Comment#7", "d") +}; + +static final Function SOURCE_BUILDER = +str -> +SourceTestStep.newBuilder(str) +.addSchema( +"ts STRING", +"a_int INT", +"b_double DOUBLE", +"c_float FLOAT", +"d_bigdec DECIMAL(10, 2)", +"`comment` STRING", +"name STRING", +"`rowtime` AS TO_TIMESTAMP(`ts`)", +"`proctime` AS PROCTIME()", +"WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND") +.addOption("changelog-mode", "I,UA,UB,D") +.producedBeforeRestore(BEFORE_DATA) +.producedAfterRestore(AFTER_DATA); +static final SourceTestStep SOURCE = SOURCE_BUILDER.apply("window_source_t").build(); + +static final SourceTestStep CDC_SOURCE = +SOURCE_BUILDER +.apply("cdc_window_source_t") +.addOption("changelog-mode", "I,UA,UB,D") +.build(); + +static final String[] TUMBLE_EVENT_TIME_BEFORE_ROWS = { +"+I[a, 2020-10-10T00:00, 2020-10-10T00:00:05, 4, 10, 2]", +"+I[a, 2020-10-10T00:00:05, 2020-10-10T00:00:10, 1, 3, 1]", +"+I[b, 2020-10-10T00:00:05, 2020-10-10T00:00:10, 2, 9, 2]", +"+I[b, 2020-10-10T00:00:15, 2020-10-10T00:00:20, 1, 4, 1]" +}; + +public static final String[] TUMBLE_EVENT_TIME_AFTER_ROWS = { +"+I[b, 2020-10-10T00:00:30, 2020-10-10T00:00:35,
[jira] [Created] (FLINK-34954) Kryo input implementation NoFetchingInput fails to handle zero length bytes
Qinghui Xu created FLINK-34954: -- Summary: Kryo input implementation NoFetchingInput fails to handle zero length bytes Key: FLINK-34954 URL: https://issues.apache.org/jira/browse/FLINK-34954 Project: Flink Issue Type: Bug Reporter: Qinghui Xu If the serailized bytes are empty, `NoFetchingInput` will run into error when Kryo tries to deserialize it. Example: a protobuf 3 object that contains only default values will be serialized as 0 length byte array, and the deserialization later will fail. Illustration: ``` import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.io.\{ByteBufferInput, ByteBufferOutput, Input, Output} import com.google.protobuf.\{DescriptorProtos, Message}import com.twitter.chill.protobuf.ProtobufSerializer import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput import java.io.ByteArrayInputStream object ProtoSerializationTest { def main(args: Array[String]) = { val chillProtoSerializer = new ProtobufSerializer val protomessage = DescriptorProtos.DescriptorProto.getDefaultInstance val output: Output = new ByteBufferOutput(1000) chillProtoSerializer.write(null, output, protomessage) val serialized: Array[Byte] = output.toBytes println(s"Serialized : $serialized") val input: Input = new NoFetchingInput(new ByteArrayInputStream(serialized)) val deserialized = chillProtoSerializer.read(null, input, classOf[BillableClick].asInstanceOf[Class[Message]]) println(deserialized) } } ``` Error ``` Exception in thread "main" java.lang.RuntimeException: Could not create class com.criteo.glup.BillableClickProto$BillableClick at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76) at com.criteo.streaming.common.bootstrap.ProtoSerialization$.main(ProtoSerialization.scala:22) at ProtoSerialization.main(ProtoSerialization.scala) Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No more bytes left. at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128) at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:332) at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73) ... 2 more Caused by: java.io.EOFException: No more bytes left. ... 5 more ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33676] Implement RestoreTests for WindowAggregate [flink]
jnh5y commented on code in PR #23886: URL: https://github.com/apache/flink/pull/23886#discussion_r1541896906 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateTestPrograms.java: ## @@ -0,0 +1,528 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.planner.utils.AggregatePhaseStrategy; +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; + +import java.math.BigDecimal; +import java.util.function.Function; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecWindowAggregate}. */ +public class WindowAggregateTestPrograms { + +static final Row[] BEFORE_DATA = { +Row.of("2020-10-10 00:00:01", 1, 1d, 1f, new BigDecimal("1.11"), "Hi", "a"), +Row.of("2020-10-10 00:00:02", 2, 2d, 2f, new BigDecimal("2.22"), "Comment#1", "a"), +Row.of("2020-10-10 00:00:03", 2, 2d, 2f, new BigDecimal("2.22"), "Comment#1", "a"), +Row.of("2020-10-10 00:00:04", 5, 5d, 5f, new BigDecimal("5.55"), null, "a"), +Row.of("2020-10-10 00:00:07", 3, 3d, 3f, null, "Hello", "b"), +// out of order +Row.of("2020-10-10 00:00:06", 6, 6d, 6f, new BigDecimal("6.66"), "Hi", "b"), +Row.of("2020-10-10 00:00:08", 3, null, 3f, new BigDecimal("3.33"), "Comment#2", "a"), +// late event +Row.of("2020-10-10 00:00:04", 5, 5d, null, new BigDecimal("5.55"), "Hi", "a"), +Row.of("2020-10-10 00:00:16", 4, 4d, 4f, new BigDecimal("4.44"), "Hi", "b"), +Row.of("2020-10-10 00:00:32", 7, 7d, 7f, new BigDecimal("7.77"), null, null), +Row.of("2020-10-10 00:00:34", 1, 3d, 3f, new BigDecimal("3.33"), "Comment#3", "b") +}; + +static final Row[] AFTER_DATA = { +Row.of("2020-10-10 00:00:40", 10, 3d, 3f, new BigDecimal("4.44"), "Comment#4", "a"), +Row.of("2020-10-10 00:00:42", 11, 4d, 4f, new BigDecimal("5.44"), "Comment#5", "d"), +Row.of("2020-10-10 00:00:43", 12, 5d, 5f, new BigDecimal("6.44"), "Comment#6", "c"), +Row.of("2020-10-10 00:00:44", 13, 6d, 6f, new BigDecimal("7.44"), "Comment#7", "d") +}; + +static final Function SOURCE_BUILDER = +str -> +SourceTestStep.newBuilder(str) +.addSchema( +"ts STRING", +"a_int INT", +"b_double DOUBLE", +"c_float FLOAT", +"d_bigdec DECIMAL(10, 2)", +"`comment` STRING", +"name STRING", +"`rowtime` AS TO_TIMESTAMP(`ts`)", +"`proctime` AS PROCTIME()", +"WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND") +.addOption("changelog-mode", "I,UA,UB,D") +.producedBeforeRestore(BEFORE_DATA) +.producedAfterRestore(AFTER_DATA); +static final SourceTestStep SOURCE = SOURCE_BUILDER.apply("window_source_t").build(); + +static final SourceTestStep CDC_SOURCE = +SOURCE_BUILDER +.apply("cdc_window_source_t") +.addOption("changelog-mode", "I,UA,UB,D") +.build(); Review Comment: Ah, I made a mistake; removing the changelog for the SOURCE_BUILDER. Just to say it out loud, the CDC_SOURCE is set up to help cover some of the tests which @xuyangzhong added like this: https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest.java#L98 Overall, there are several orthogonal options, and at some point,
[jira] [Commented] (FLINK-34950) Disable spotless on Java 21 for connector-shared-utils
[ https://issues.apache.org/jira/browse/FLINK-34950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831486#comment-17831486 ] Sergey Nuyanzin commented on FLINK-34950: - Merged as [d719c95235db17f5932d1bb5d917f7d6e195c371|https://github.com/apache/flink-connector-shared-utils/commit/d719c95235db17f5932d1bb5d917f7d6e195c371] > Disable spotless on Java 21 for connector-shared-utils > -- > > Key: FLINK-34950 > URL: https://issues.apache.org/jira/browse/FLINK-34950 > Project: Flink > Issue Type: Bug > Components: Connectors / Parent >Affects Versions: connector-parent-1.1.0 >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > > after https://github.com/apache/flink-connector-shared-utils/pull/19 > spotless was stopped being skipped for java17+ in parent pom > however we still need to skip it for java21+ -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34950] Disable spotless for Java 21+ [flink-connector-shared-utils]
snuyanzin merged PR #39: URL: https://github.com/apache/flink-connector-shared-utils/pull/39 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34324][ci] Replaces AWS-based S3 e2e tests with Minio-backed version [flink]
RyanSkraba commented on code in PR #24465: URL: https://github.com/apache/flink/pull/24465#discussion_r1541478084 ## flink-end-to-end-tests/test-scripts/test_file_sink.sh: ## @@ -79,30 +42,69 @@ TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram. # sorted content of part files ### function get_complete_result { - if [ "${OUT_TYPE}" == "s3" ]; then -s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "$S3_PREFIX" "part-" true - fi find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g } ### # Get total number of lines in part files. # # Globals: -# S3_PREFIX +# OUTPUT_PATH # Arguments: # None # Returns: # line number in part files ### function get_total_number_of_valid_lines { - if [ "${OUT_TYPE}" == "local" ]; then -get_complete_result | wc -l | tr -d '[:space:]' - elif [ "${OUT_TYPE}" == "s3" ]; then -s3_get_number_of_lines_by_prefix "${S3_PREFIX}" "part-" - fi + get_complete_result | wc -l | tr -d '[:space:]' } +if [ "${OUT_TYPE}" == "local" ]; then + echo "[INFO] Test run in local environment: No S3 environment is not loaded." +elif [ "${OUT_TYPE}" == "s3" ]; then + source "$(dirname "$0")"/common_s3_minio.sh + s3_setup hadoop + + # overwrites JOB_OUTPUT_PATH to point to S3 + S3_DATA_PREFIX="${RANDOM_PREFIX}" + S3_CHECKPOINT_PREFIX="${RANDOM_PREFIX}-chk" + JOB_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/${S3_DATA_PREFIX}" + set_config_key "state.checkpoints.dir" "s3://$IT_CASE_S3_BUCKET/${S3_CHECKPOINT_PREFIX}" Review Comment: In the original, there was a line `mkdir -p "$OUTPUT_PATH-chk"`. Is this no longer necessary? ## flink-end-to-end-tests/test-scripts/test_file_sink.sh: ## @@ -79,30 +42,69 @@ TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram. # sorted content of part files ### function get_complete_result { - if [ "${OUT_TYPE}" == "s3" ]; then -s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "$S3_PREFIX" "part-" true - fi find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g } ### # Get total number of lines in part files. # # Globals: -# S3_PREFIX +# OUTPUT_PATH # Arguments: # None # Returns: # line number in part files ### function get_total_number_of_valid_lines { - if [ "${OUT_TYPE}" == "local" ]; then -get_complete_result | wc -l | tr -d '[:space:]' - elif [ "${OUT_TYPE}" == "s3" ]; then -s3_get_number_of_lines_by_prefix "${S3_PREFIX}" "part-" - fi + get_complete_result | wc -l | tr -d '[:space:]' } +if [ "${OUT_TYPE}" == "local" ]; then + echo "[INFO] Test run in local environment: No S3 environment is not loaded." +elif [ "${OUT_TYPE}" == "s3" ]; then + source "$(dirname "$0")"/common_s3_minio.sh + s3_setup hadoop + + # overwrites JOB_OUTPUT_PATH to point to S3 + S3_DATA_PREFIX="${RANDOM_PREFIX}" + S3_CHECKPOINT_PREFIX="${RANDOM_PREFIX}-chk" + JOB_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/${S3_DATA_PREFIX}" + set_config_key "state.checkpoints.dir" "s3://$IT_CASE_S3_BUCKET/${S3_CHECKPOINT_PREFIX}" + + # overwrites implementation for local runs + function get_complete_result { +# copies the data from S3 to the local OUTPUT_PATH +s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "$FILE_SINK_TEST_TEMP_SUBFOLDER" "part-" true + +# and prints the sorted output +find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g + } + + # overwrites implementation for local runs + function get_total_number_of_valid_lines { +s3_get_number_of_lines_by_prefix "${FILE_SINK_TEST_TEMP_SUBFOLDER}" "part-" + } + + # make sure we delete the file at the end + function out_cleanup { +s3_delete_by_full_path_prefix "${S3_DATA_PREFIX}" +s3_delete_by_full_path_prefix "${S3_CHECKPOINT_PREFIX}" + } + + on_exit out_cleanup +else + echo "[ERROR] Unknown out type: ${OUT_TYPE}" + exit 1 +fi + +# randomly set up openSSL with dynamically/statically linked libraries Review Comment: Did you mean to copy the `OPENSSL_LINKAGE` lines here? ## flink-end-to-end-tests/test-scripts/test_file_sink.sh: ## @@ -79,30 +42,69 @@ TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram. # sorted content of part files ### function get_complete_result { - if [ "${OUT_TYPE}" == "s3" ]; then -s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "$S3_PREFIX" "part-" true - fi find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g } ### # Get total number of lines in part files. # # Globals: -# S3_PREFIX +# OUTPUT_PATH # Arguments: # None # Returns: # line number in
[jira] [Updated] (FLINK-34538) Tune Flink config of autoscaled jobs
[ https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34538: --- Labels: pull-request-available (was: ) > Tune Flink config of autoscaled jobs > > > Key: FLINK-34538 > URL: https://issues.apache.org/jira/browse/FLINK-34538 > Project: Flink > Issue Type: New Feature > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > > Umbrella issue to tackle tuning the Flink configuration as part of Flink > Autoscaling. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-34538][docs] Add Autotuning documentation [flink-kubernetes-operator]
mxm opened a new pull request, #807: URL: https://github.com/apache/flink-kubernetes-operator/pull/807 This adds documentation for Flink Autotuning. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-34551) Align retry mechanisms of FutureUtils
[ https://issues.apache.org/jira/browse/FLINK-34551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831453#comment-17831453 ] Kumar Mallikarjuna edited comment on FLINK-34551 at 3/27/24 4:38 PM: - Hi [~mapohl] , I've opened a PR ([#24578|https://github.com/apache/flink/pull/24578]) refactoring `retryOperation()` and `retryOperationWithDelay()`. I haven't refactored `retrySuccessfulWithDelay()`, since you had some changes on that already in your PR ([#24309|https://github.com/apache/flink/pull/24309]). Would you mind reviewing? was (Author: JIRAUSER303984): Hi [~mapohl] , I've opened a PR ([#24578|https://github.com/apache/flink/pull/24578]) refactoring `retryOperation()` and `retryOperationWithDelay()`. I haven't refactored `retrySuccessfulWithDelay()`, since you had some changes on that already in your PR ([#24309|https://github.com/apache/flink/pull/24309]). > Align retry mechanisms of FutureUtils > - > > Key: FLINK-34551 > URL: https://issues.apache.org/jira/browse/FLINK-34551 > Project: Flink > Issue Type: Technical Debt > Components: API / Core >Affects Versions: 1.20.0 >Reporter: Matthias Pohl >Assignee: Kumar Mallikarjuna >Priority: Major > Labels: pull-request-available > > The retry mechanisms of FutureUtils include quite a bit of redundant code > which makes it hard to understand and to extend. The logic should be aligned > properly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34551) Align retry mechanisms of FutureUtils
[ https://issues.apache.org/jira/browse/FLINK-34551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831453#comment-17831453 ] Kumar Mallikarjuna commented on FLINK-34551: Hi [~mapohl] , I've opened a PR ([#24578|https://github.com/apache/flink/pull/24578]) refactoring `retryOperation()` and `retryOperationWithDelay()`. I haven't refactored `retrySuccessfulWithDelay()`, since you had some changes on that already in your PR ([#24309|https://github.com/apache/flink/pull/24309]). > Align retry mechanisms of FutureUtils > - > > Key: FLINK-34551 > URL: https://issues.apache.org/jira/browse/FLINK-34551 > Project: Flink > Issue Type: Technical Debt > Components: API / Core >Affects Versions: 1.20.0 >Reporter: Matthias Pohl >Assignee: Kumar Mallikarjuna >Priority: Major > Labels: pull-request-available > > The retry mechanisms of FutureUtils include quite a bit of redundant code > which makes it hard to understand and to extend. The logic should be aligned > properly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34551][core] Refactor retry logic in `FutureUtils` [flink]
flinkbot commented on PR #24578: URL: https://github.com/apache/flink/pull/24578#issuecomment-2023246471 ## CI report: * 791954d16c88be92ae08faa1775f8bbeb583c727 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34551) Align retry mechanisms of FutureUtils
[ https://issues.apache.org/jira/browse/FLINK-34551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34551: --- Labels: pull-request-available (was: ) > Align retry mechanisms of FutureUtils > - > > Key: FLINK-34551 > URL: https://issues.apache.org/jira/browse/FLINK-34551 > Project: Flink > Issue Type: Technical Debt > Components: API / Core >Affects Versions: 1.20.0 >Reporter: Matthias Pohl >Assignee: Kumar Mallikarjuna >Priority: Major > Labels: pull-request-available > > The retry mechanisms of FutureUtils include quite a bit of redundant code > which makes it hard to understand and to extend. The logic should be aligned > properly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-34551][core] Refactor retry logic in `FutureUtils` [flink]
kumar-mallikarjuna opened a new pull request, #24578: URL: https://github.com/apache/flink/pull/24578 ## What is the purpose of the change FutureUtils has different retry methods and multiple implementations which are overlapping. This PR refactors the `retryOperation()` and `retryOperationWithDelay()` methods. ## Brief change log - Make the `retryOperation()` support scheduled exection - Remove `retryOperationWithDelay()` ## Verifying this change This change is already covered by existing tests, such as `FutureUtilsTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (don't know) - The fabric8 client uses `FutureUtils` - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] FLINK-31361 - Added the right shaded dependency for SQL client connector for kafka - Updated kafka.md [flink-connector-kafka]
diptimanr opened a new pull request, #88: URL: https://github.com/apache/flink-connector-kafka/pull/88 Modified the documentation on using the right dependency for 'properties.sasl.jaas.config'. When using 'flink-sql-connector-kafka.jar', existing document doesn't use the shaded dependency. Also added the name of the jar file (flink-sql-connector-kafka-x.xx.x.jar). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31361) job created by sql-client can't authenticate to kafka, can't find org.apache.kafka.common.security.plain.PlainLoginModule
[ https://issues.apache.org/jira/browse/FLINK-31361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31361: --- Labels: pull-request-available (was: ) > job created by sql-client can't authenticate to kafka, can't find > org.apache.kafka.common.security.plain.PlainLoginModule > - > > Key: FLINK-31361 > URL: https://issues.apache.org/jira/browse/FLINK-31361 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.16.1 >Reporter: David Anderson >Priority: Major > Labels: pull-request-available > > I'm working with this SQL DDL: > {noformat} > CREATE TABLE pageviews_sink ( > `url` STRING, > `user_id` STRING, > `browser` STRING, > `ts` TIMESTAMP_LTZ(3) > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'pageviews', > 'properties.bootstrap.servers' = 'xxx.confluent.cloud:9092', > 'properties.security.protocol'='SASL_SSL', > 'properties.sasl.mechanism'='PLAIN', > > 'properties.sasl.jaas.config'='org.apache.kafka.common.security.plain.PlainLoginModule > required username="xxx" password="xxx";', > 'key.format' = 'json', > 'key.fields' = 'url', > 'value.format' = 'json' > ); > {noformat} > With {{flink-sql-connector-kafka-1.16.1.jar}} in the lib directory, this > fails with > {noformat} > Caused by: javax.security.auth.login.LoginException: No LoginModule found for > org.apache.kafka.common.security.plain.PlainLoginModule{noformat} > As a workaround I've found that it does work if I provide both > > {{flink-connector-kafka-1.16.1.jar}} > {{kafka-clients-3.2.3.jar}} > > in the lib directory. It seems like the relocation applied in the SQL > connector isn't working properly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] FLINK-31361 - Added the right shaded dependency for SQL client connector for kafka - Updated kafka.md [flink-connector-kafka]
boring-cyborg[bot] commented on PR #88: URL: https://github.com/apache/flink-connector-kafka/pull/88#issuecomment-2023159980 Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-34937) Apache Infra GHA policy update
[ https://issues.apache.org/jira/browse/FLINK-34937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831422#comment-17831422 ] Matthias Pohl edited comment on FLINK-34937 at 3/27/24 3:45 PM: We should pin all actions (i.e. use the git SHA rather than a version tag) for external actions (anything other than {{actions/\*}}, {{github/\*}} and {{apache/\*}} prefixed actions). That's not the case right now. was (Author: mapohl): We should pin all actions (i.e. use the git SHA rather than a version tag) for external actions (anything other than {{actions/*}}, {{github/*}} and {{apache/*}} prefixed actions). That's not the case right now. > Apache Infra GHA policy update > -- > > Key: FLINK-34937 > URL: https://issues.apache.org/jira/browse/FLINK-34937 > Project: Flink > Issue Type: Sub-task > Components: Build System / CI >Affects Versions: 1.19.0, 1.18.1, 1.20.0 >Reporter: Matthias Pohl >Priority: Major > > There is a policy update [announced in the infra > ML|https://www.mail-archive.com/jdo-dev@db.apache.org/msg13638.html] which > asked Apache projects to limit the number of runners per job. Additionally, > the [GHA policy|https://infra.apache.org/github-actions-policy.html] is > referenced which I wasn't aware of when working on the action workflow. > This issue is about applying the policy to the Flink GHA workflows. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34937) Apache Infra GHA policy update
[ https://issues.apache.org/jira/browse/FLINK-34937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831422#comment-17831422 ] Matthias Pohl commented on FLINK-34937: --- We should pin all actions (i.e. use the git SHA rather than a version tag) for external actions (anything other than {{actions/*}}, {{github/*}} and {{apache/*}} prefixed actions). That's not the case right now. > Apache Infra GHA policy update > -- > > Key: FLINK-34937 > URL: https://issues.apache.org/jira/browse/FLINK-34937 > Project: Flink > Issue Type: Sub-task > Components: Build System / CI >Affects Versions: 1.19.0, 1.18.1, 1.20.0 >Reporter: Matthias Pohl >Priority: Major > > There is a policy update [announced in the infra > ML|https://www.mail-archive.com/jdo-dev@db.apache.org/msg13638.html] which > asked Apache projects to limit the number of runners per job. Additionally, > the [GHA policy|https://infra.apache.org/github-actions-policy.html] is > referenced which I wasn't aware of when working on the action workflow. > This issue is about applying the policy to the Flink GHA workflows. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33376][coordination] Extend ZooKeeper Curator configurations [flink]
XComp commented on PR #24563: URL: https://github.com/apache/flink/pull/24563#issuecomment-2023008617 no worries :-) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33376][coordination] Extend ZooKeeper Curator configurations [flink]
JTaky commented on PR #24563: URL: https://github.com/apache/flink/pull/24563#issuecomment-2023006486 Thanks @XComp for the review and sorry for more back and force than should be. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34947] Only scale down JM in Foreground deletion propagation and reduce timeout [flink-kubernetes-operator]
mxm commented on code in PR #806: URL: https://github.com/apache/flink-kubernetes-operator/pull/806#discussion_r1541299679 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java: ## @@ -306,34 +310,42 @@ protected Map getVertexResources( } /** - * Scale JM deployment to zero to gracefully stop all JM instances before any TMs are stopped. - * This avoids race conditions between JM shutdown and TM shutdown / failure handling. + * Shut down JobManagers gracefully by scaling JM deployment to zero. This avoids race + * conditions between JM shutdown and TM shutdown / failure handling. * * @param jmDeployment * @param namespace * @param clusterId - * @param timeout + * @param remainingTimeout * @return Remaining timeout after the operation. */ -private Duration scaleJmToZeroBlocking( +private Duration shutdownJobManagers( EditReplacePatchable jmDeployment, String namespace, String clusterId, -Duration timeout) { -return deleteBlocking( -"Scaling JobManager Deployment to zero", -() -> { -try { - jmDeployment.patch(PatchContext.of(PatchType.JSON_MERGE), SCALE_TO_ZERO); -} catch (Exception ignore) { -// Ignore all errors here as this is an optional step -return null; -} -return kubernetesClient -.pods() -.inNamespace(namespace) - .withLabels(KubernetesUtils.getJobManagerSelectors(clusterId)); -}, -timeout); +Duration remainingTimeout) { + +// We use only half of the shutdown timeout but at most one minute as the main point +// here is to initiate JM shutdown before the TMs +var jmShutdownTimeout = +ObjectUtils.min(JM_SHUTDOWN_MAX_WAIT, remainingTimeout.dividedBy(2)); Review Comment: I was missing that a Foreground delete call would clean up TaskManagers through delete propagation, even if the JM pod was stuck. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-34419) flink-docker's .github/workflows/snapshot.yml doesn't support JDK 17 and 21
[ https://issues.apache.org/jira/browse/FLINK-34419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl resolved FLINK-34419. --- Resolution: Fixed > flink-docker's .github/workflows/snapshot.yml doesn't support JDK 17 and 21 > --- > > Key: FLINK-34419 > URL: https://issues.apache.org/jira/browse/FLINK-34419 > Project: Flink > Issue Type: Technical Debt > Components: Build System / CI >Reporter: Matthias Pohl >Assignee: Muhammet Orazov >Priority: Major > Labels: pull-request-available, starter > > [.github/workflows/snapshot.yml|https://github.com/apache/flink-docker/blob/master/.github/workflows/snapshot.yml#L40] > needs to be updated: JDK 17 support was added in 1.18 (FLINK-15736). JDK 21 > support was added in 1.19 (FLINK-33163) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34419) flink-docker's .github/workflows/snapshot.yml doesn't support JDK 17 and 21
[ https://issues.apache.org/jira/browse/FLINK-34419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831391#comment-17831391 ] Matthias Pohl edited comment on FLINK-34419 at 3/27/24 2:56 PM: master: 9e0041a2c9dace4bf3f32815e3e24e24385b179b dev-master: 1460077743b29e17edd0a2d7efd3897fa097988d dev-1.19: 67d7c46ed382a665e941f0cf1f1606d10f87dee5 dev-1.18: d93d911b015e535fc2b6f1426c3b36229ff3d02a was (Author: mapohl): master: 9e0041a2c9dace4bf3f32815e3e24e24385b179b dev-master: tba dev-1.19: tba dev-1.18: tba > flink-docker's .github/workflows/snapshot.yml doesn't support JDK 17 and 21 > --- > > Key: FLINK-34419 > URL: https://issues.apache.org/jira/browse/FLINK-34419 > Project: Flink > Issue Type: Technical Debt > Components: Build System / CI >Reporter: Matthias Pohl >Assignee: Muhammet Orazov >Priority: Major > Labels: pull-request-available, starter > > [.github/workflows/snapshot.yml|https://github.com/apache/flink-docker/blob/master/.github/workflows/snapshot.yml#L40] > needs to be updated: JDK 17 support was added in 1.18 (FLINK-15736). JDK 21 > support was added in 1.19 (FLINK-33163) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34419][docker] Add tests for JDK 17 [flink-docker]
XComp merged PR #184: URL: https://github.com/apache/flink-docker/pull/184 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34419][docker] Add tests for JDK 17 & 21 [flink-docker]
XComp merged PR #183: URL: https://github.com/apache/flink-docker/pull/183 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34419][docker] Add tests for JDK 17 & 21 [flink-docker]
XComp merged PR #182: URL: https://github.com/apache/flink-docker/pull/182 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34419) flink-docker's .github/workflows/snapshot.yml doesn't support JDK 17 and 21
[ https://issues.apache.org/jira/browse/FLINK-34419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831391#comment-17831391 ] Matthias Pohl commented on FLINK-34419: --- master: 9e0041a2c9dace4bf3f32815e3e24e24385b179b dev-master: tba dev-1.19: tba dev-1.18: tba > flink-docker's .github/workflows/snapshot.yml doesn't support JDK 17 and 21 > --- > > Key: FLINK-34419 > URL: https://issues.apache.org/jira/browse/FLINK-34419 > Project: Flink > Issue Type: Technical Debt > Components: Build System / CI >Reporter: Matthias Pohl >Assignee: Muhammet Orazov >Priority: Major > Labels: pull-request-available, starter > > [.github/workflows/snapshot.yml|https://github.com/apache/flink-docker/blob/master/.github/workflows/snapshot.yml#L40] > needs to be updated: JDK 17 support was added in 1.18 (FLINK-15736). JDK 21 > support was added in 1.19 (FLINK-33163) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34419][docker] Added support of JDK 17 & 21 for Flink 1.18+ [flink-docker]
XComp merged PR #181: URL: https://github.com/apache/flink-docker/pull/181 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34947] Only scale down JM in Foreground deletion propagation and reduce timeout [flink-kubernetes-operator]
mxm commented on code in PR #806: URL: https://github.com/apache/flink-kubernetes-operator/pull/806#discussion_r1541273623 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java: ## @@ -306,34 +310,42 @@ protected Map getVertexResources( } /** - * Scale JM deployment to zero to gracefully stop all JM instances before any TMs are stopped. - * This avoids race conditions between JM shutdown and TM shutdown / failure handling. + * Shut down JobManagers gracefully by scaling JM deployment to zero. This avoids race + * conditions between JM shutdown and TM shutdown / failure handling. * * @param jmDeployment * @param namespace * @param clusterId - * @param timeout + * @param remainingTimeout * @return Remaining timeout after the operation. */ -private Duration scaleJmToZeroBlocking( +private Duration shutdownJobManagers( EditReplacePatchable jmDeployment, String namespace, String clusterId, -Duration timeout) { -return deleteBlocking( -"Scaling JobManager Deployment to zero", -() -> { -try { - jmDeployment.patch(PatchContext.of(PatchType.JSON_MERGE), SCALE_TO_ZERO); -} catch (Exception ignore) { -// Ignore all errors here as this is an optional step -return null; -} -return kubernetesClient -.pods() -.inNamespace(namespace) - .withLabels(KubernetesUtils.getJobManagerSelectors(clusterId)); -}, -timeout); +Duration remainingTimeout) { + +// We use only half of the shutdown timeout but at most one minute as the main point +// here is to initiate JM shutdown before the TMs +var jmShutdownTimeout = +ObjectUtils.min(JM_SHUTDOWN_MAX_WAIT, remainingTimeout.dividedBy(2)); Review Comment: I'm proposing to keep the core deletion logic as in the main branch, together with the other changes in this PR (condition on Foreground deletion). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34947] Only scale down JM in Foreground deletion propagation and reduce timeout [flink-kubernetes-operator]
mxm commented on code in PR #806: URL: https://github.com/apache/flink-kubernetes-operator/pull/806#discussion_r1541258264 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java: ## @@ -306,34 +310,42 @@ protected Map getVertexResources( } /** - * Scale JM deployment to zero to gracefully stop all JM instances before any TMs are stopped. - * This avoids race conditions between JM shutdown and TM shutdown / failure handling. + * Shut down JobManagers gracefully by scaling JM deployment to zero. This avoids race + * conditions between JM shutdown and TM shutdown / failure handling. * * @param jmDeployment * @param namespace * @param clusterId - * @param timeout + * @param remainingTimeout * @return Remaining timeout after the operation. */ -private Duration scaleJmToZeroBlocking( +private Duration shutdownJobManagers( EditReplacePatchable jmDeployment, String namespace, String clusterId, -Duration timeout) { -return deleteBlocking( -"Scaling JobManager Deployment to zero", -() -> { -try { - jmDeployment.patch(PatchContext.of(PatchType.JSON_MERGE), SCALE_TO_ZERO); -} catch (Exception ignore) { -// Ignore all errors here as this is an optional step -return null; -} -return kubernetesClient -.pods() -.inNamespace(namespace) - .withLabels(KubernetesUtils.getJobManagerSelectors(clusterId)); -}, -timeout); +Duration remainingTimeout) { + +// We use only half of the shutdown timeout but at most one minute as the main point +// here is to initiate JM shutdown before the TMs +var jmShutdownTimeout = +ObjectUtils.min(JM_SHUTDOWN_MAX_WAIT, remainingTimeout.dividedBy(2)); Review Comment: I think I understand what the new delete behavior (in the main branch) is trying to solve. I'm just saying that there is no need to time out the scale down request after half the timeout (or after one minute). The current behavior (in the main branch) is sufficient and avoids unnecessary complexity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34947] Only scale down JM in Foreground deletion propagation and reduce timeout [flink-kubernetes-operator]
mxm commented on code in PR #806: URL: https://github.com/apache/flink-kubernetes-operator/pull/806#discussion_r1541258264 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java: ## @@ -306,34 +310,42 @@ protected Map getVertexResources( } /** - * Scale JM deployment to zero to gracefully stop all JM instances before any TMs are stopped. - * This avoids race conditions between JM shutdown and TM shutdown / failure handling. + * Shut down JobManagers gracefully by scaling JM deployment to zero. This avoids race + * conditions between JM shutdown and TM shutdown / failure handling. * * @param jmDeployment * @param namespace * @param clusterId - * @param timeout + * @param remainingTimeout * @return Remaining timeout after the operation. */ -private Duration scaleJmToZeroBlocking( +private Duration shutdownJobManagers( EditReplacePatchable jmDeployment, String namespace, String clusterId, -Duration timeout) { -return deleteBlocking( -"Scaling JobManager Deployment to zero", -() -> { -try { - jmDeployment.patch(PatchContext.of(PatchType.JSON_MERGE), SCALE_TO_ZERO); -} catch (Exception ignore) { -// Ignore all errors here as this is an optional step -return null; -} -return kubernetesClient -.pods() -.inNamespace(namespace) - .withLabels(KubernetesUtils.getJobManagerSelectors(clusterId)); -}, -timeout); +Duration remainingTimeout) { + +// We use only half of the shutdown timeout but at most one minute as the main point +// here is to initiate JM shutdown before the TMs +var jmShutdownTimeout = +ObjectUtils.min(JM_SHUTDOWN_MAX_WAIT, remainingTimeout.dividedBy(2)); Review Comment: I think I understand what the new behavior (in the main branch) is trying to solve. I'm just saying that there is no need to time out the scale down request after half the timeout (or after one minute). The old behavior (in the main branch) is sufficient and avoids unnecessary complexity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34419][docker] Add tests for JDK 17 & 21 [flink-docker]
morazow commented on PR #182: URL: https://github.com/apache/flink-docker/pull/182#issuecomment-2022958489 Thanks @XComp, I have updated both pull requests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34947] Only scale down JM in Foreground deletion propagation and reduce timeout [flink-kubernetes-operator]
mxm commented on code in PR #806: URL: https://github.com/apache/flink-kubernetes-operator/pull/806#discussion_r1541258264 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java: ## @@ -306,34 +310,42 @@ protected Map getVertexResources( } /** - * Scale JM deployment to zero to gracefully stop all JM instances before any TMs are stopped. - * This avoids race conditions between JM shutdown and TM shutdown / failure handling. + * Shut down JobManagers gracefully by scaling JM deployment to zero. This avoids race + * conditions between JM shutdown and TM shutdown / failure handling. * * @param jmDeployment * @param namespace * @param clusterId - * @param timeout + * @param remainingTimeout * @return Remaining timeout after the operation. */ -private Duration scaleJmToZeroBlocking( +private Duration shutdownJobManagers( EditReplacePatchable jmDeployment, String namespace, String clusterId, -Duration timeout) { -return deleteBlocking( -"Scaling JobManager Deployment to zero", -() -> { -try { - jmDeployment.patch(PatchContext.of(PatchType.JSON_MERGE), SCALE_TO_ZERO); -} catch (Exception ignore) { -// Ignore all errors here as this is an optional step -return null; -} -return kubernetesClient -.pods() -.inNamespace(namespace) - .withLabels(KubernetesUtils.getJobManagerSelectors(clusterId)); -}, -timeout); +Duration remainingTimeout) { + +// We use only half of the shutdown timeout but at most one minute as the main point +// here is to initiate JM shutdown before the TMs +var jmShutdownTimeout = +ObjectUtils.min(JM_SHUTDOWN_MAX_WAIT, remainingTimeout.dividedBy(2)); Review Comment: I think I understand what the new behavior is trying to solve. I'm just saying that there is no need to time out the scale down request after half the timeout (or after one minute). The old behavior (in the main branch) is sufficient and avoids unnecessary complexity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.18][FLINK-34897][test] Enables JobMasterServiceLeadershipRunnerTest#testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip again. [flink]
XComp merged PR #24577: URL: https://github.com/apache/flink/pull/24577 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.19][FLINK-34897][test] Enables JobMasterServiceLeadershipRunnerTest#testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip again. [flink]
XComp merged PR #24576: URL: https://github.com/apache/flink/pull/24576 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34897][test] Enables JobMasterServiceLeadershipRunnerTest#testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip again. [flink]
XComp merged PR #24546: URL: https://github.com/apache/flink/pull/24546 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-34897) JobMasterServiceLeadershipRunnerTest#testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip needs to be enabled again
[ https://issues.apache.org/jira/browse/FLINK-34897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl resolved FLINK-34897. --- Fix Version/s: 1.18.2 1.20.0 1.19.1 Resolution: Fixed master: [0e70d89ad9f807a5816290e9808720e71bdad655|https://github.com/apache/flink/commit/0e70d89ad9f807a5816290e9808720e71bdad655] 1.19: [6b5c48ff53ddc6e75056a9050afded2ac44a413a|https://github.com/apache/flink/commit/6b5c48ff53ddc6e75056a9050afded2ac44a413a] 1.18: [a6aa569f5005041934a2e6398b6749584beeaabd|https://github.com/apache/flink/commit/a6aa569f5005041934a2e6398b6749584beeaabd] > JobMasterServiceLeadershipRunnerTest#testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip > needs to be enabled again > -- > > Key: FLINK-34897 > URL: https://issues.apache.org/jira/browse/FLINK-34897 > Project: Flink > Issue Type: Technical Debt > Components: Runtime / Coordination >Affects Versions: 1.19.0, 1.18.1, 1.20.0 >Reporter: Matthias Pohl >Assignee: Matthias Pohl >Priority: Major > Labels: pull-request-available > Fix For: 1.18.2, 1.20.0, 1.19.1 > > > While working on FLINK-34672 I noticed that > {{JobMasterServiceLeadershipRunnerTest#testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip}} > is disabled without a reason. > It looks like I disabled it accidentally as part of FLINK-31783. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
XComp commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1541160078 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java: ## @@ -34,66 +35,105 @@ import org.apache.flink.table.sinks.CsvTableSink; import org.apache.flink.table.sources.InputFormatTableSource; import org.apache.flink.table.types.DataType; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.test.resources.ResourceTestUtils; import org.apache.flink.types.Row; +import org.apache.flink.util.TestLoggerExtension; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.nio.file.Files; +import java.nio.file.Path; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.UUID; -/** - * End-to-end test for batch SQL queries. - * - * The sources are generated and bounded. The result is always constant. - * - * Parameters: -outputPath output file path for CsvTableSink; -sqlStatement SQL statement that - * will be executed as executeSql - */ -public class BatchSQLTestProgram { +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** End-to-End tests for Batch SQL tests. */ +@ExtendWith(TestLoggerExtension.class) +public class BatchSQLTest { Review Comment: ```suggestion class BatchSQLTest { ``` Junit5 allows for test classes to be package-protected. This will enable you to remove the JavaDoc and still comply to checkstyle. The JavaDoc itself doesn't add much value. ## flink-end-to-end-tests/flink-batch-sql-test/src/test/resources/log4j2-test.properties: ## @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level=OFF +rootLogger.appenderRef.test.ref=TestLogger +appender.testlogger.name=TestLogger +appender.testlogger.type=CONSOLE +appender.testlogger.target=SYSTEM_ERR +appender.testlogger.layout.type=PatternLayout +appender.testlogger.layout.pattern=%-4r [%t] %-5p %c %x - %m%n +# Uncomment to enable codegen logging +#loggers = testlogger +#logger.testlogger.name =org.apache.flink.table.planner.codegen +#logger.testlogger.level = TRACE +#logger.testlogger.appenderRefs = TestLogger Review Comment: ```suggestion # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes rootLogger.level=OFF rootLogger.appenderRef.test.ref=TestLogger appender.testlogger.name=TestLogger appender.testlogger.type=CONSOLE appender.testlogger.target=SYSTEM_ERR appender.testlogger.layout.type=PatternLayout appender.testlogger.layout.pattern=%-4r [%t] %-5p %c %x - %m%n # Uncomment to enable codegen logging #loggers = testlogger #logger.testlogger.name =org.apache.flink.table.planner.codegen #logger.testlogger.level = TRACE #logger.testlogger.appenderRefs = TestLogger ``` Then let's add some empty lines in between ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java: ## @@ -34,66 +35,105 @@ import org.apache.flink.table.sinks.CsvTableSink; import org.apache.flink.table.sources.InputFormatTableSource; import org.apache.flink.table.types.DataType; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.test.resources.ResourceTestUtils; import org.apache.flink.types.Row; +import org.apache.flink.util.TestLoggerExtension; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir;
[jira] [Commented] (FLINK-34953) Add github ci for flink-web to auto commit build files
[ https://issues.apache.org/jira/browse/FLINK-34953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831367#comment-17831367 ] Zhongqiang Gong commented on FLINK-34953: - [~martijnvisser] Thanks for your patience in explaining. > Add github ci for flink-web to auto commit build files > -- > > Key: FLINK-34953 > URL: https://issues.apache.org/jira/browse/FLINK-34953 > Project: Flink > Issue Type: Improvement > Components: Project Website >Reporter: Zhongqiang Gong >Priority: Minor > Labels: website > > Currently, https://github.com/apache/flink-web commit build files by local > build. So I want use github ci to build docs and commit. > > Changes: > * Add website build check for pr > * Auto build and commit build files after pr was merged to `asf-site` > * Optinal: this ci can triggered by manual -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34953) Add github ci for flink-web to auto commit build files
[ https://issues.apache.org/jira/browse/FLINK-34953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831359#comment-17831359 ] Martijn Visser commented on FLINK-34953: [~gongzhongqiang] ASF policies don't allow anyone else besides committers (not even CI) to commit files. So this can't happen > Add github ci for flink-web to auto commit build files > -- > > Key: FLINK-34953 > URL: https://issues.apache.org/jira/browse/FLINK-34953 > Project: Flink > Issue Type: Improvement > Components: Project Website >Reporter: Zhongqiang Gong >Priority: Minor > Labels: website > > Currently, https://github.com/apache/flink-web commit build files by local > build. So I want use github ci to build docs and commit. > > Changes: > * Add website build check for pr > * Auto build and commit build files after pr was merged to `asf-site` > * Optinal: this ci can triggered by manual -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-34953) Add github ci for flink-web to auto commit build files
[ https://issues.apache.org/jira/browse/FLINK-34953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser closed FLINK-34953. -- Resolution: Invalid > Add github ci for flink-web to auto commit build files > -- > > Key: FLINK-34953 > URL: https://issues.apache.org/jira/browse/FLINK-34953 > Project: Flink > Issue Type: Improvement > Components: Project Website >Reporter: Zhongqiang Gong >Priority: Minor > Labels: website > > Currently, https://github.com/apache/flink-web commit build files by local > build. So I want use github ci to build docs and commit. > > Changes: > * Add website build check for pr > * Auto build and commit build files after pr was merged to `asf-site` > * Optinal: this ci can triggered by manual -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]
MartijnVisser commented on PR #24526: URL: https://github.com/apache/flink/pull/24526#issuecomment-2022804099 > What is your opinion on how the function should behave? I've taken a look at how INTERSECT is defined in the SQL standard. Based on https://stackoverflow.com/questions/59060599/does-intersect-operator-exist-in-the-sql-standar, https://www.postgresql.org/docs/current/queries-union.html, the fact that Calcite differentiates between INTERSECT and INTERSECT ALL leads me to believe that the default behavior of INTERSECT is to remove duplicates. So the result of INTERSECT on `[1, 1, 1, 2] INTERSECT [1, 1, 2]` should be `[1, 2]` in my understanding. I think that Spark/Databricks/Presto are performing the correct behavior. BigQuery and Redshift don't support ARRAY_INTERSECT. ksqlDB follows the same behavior as Spark/Databricks/Presto per https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/scalar-functions/#array_intersect. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33376][coordination] Extend ZooKeeper Curator configurations [flink]
XComp commented on code in PR #24563: URL: https://github.com/apache/flink/pull/24563#discussion_r1541117995 ## flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java: ## @@ -246,6 +249,38 @@ public static CuratorFrameworkWithUnhandledErrorListener startCuratorFramework( .ensembleTracker(ensembleTracking) .aclProvider(aclProvider); +if (configuration.contains(HighAvailabilityOptions.ZOOKEEPER_CLIENT_AUTHORIZATION)) { +Map authMap = + configuration.get(HighAvailabilityOptions.ZOOKEEPER_CLIENT_AUTHORIZATION); +List authInfos = +authMap.entrySet().stream() +.map( +entry -> +new AuthInfo( +entry.getKey(), +entry.getValue() +.getBytes( + ConfigConstants + .DEFAULT_CHARSET))) +.collect(Collectors.toList()); +curatorFrameworkBuilder.authorization(authInfos); +} + +if (configuration.contains(HighAvailabilityOptions.ZOOKEEPER_MAX_CLOSE_WAIT_MS)) { +curatorFrameworkBuilder.maxCloseWaitMs( +(int) +configuration + .get(HighAvailabilityOptions.ZOOKEEPER_MAX_CLOSE_WAIT_MS) +.toMillis()); Review Comment: You have a point there. It appears to be a quite nitty. But on the other hand, it's also quite unlikely that a user would set such a timeout. Therefore, an exception wouldn't hurt. :shrug: ## flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java: ## @@ -246,6 +249,38 @@ public static CuratorFrameworkWithUnhandledErrorListener startCuratorFramework( .ensembleTracker(ensembleTracking) .aclProvider(aclProvider); +if (configuration.contains(HighAvailabilityOptions.ZOOKEEPER_CLIENT_AUTHORIZATION)) { +Map authMap = + configuration.get(HighAvailabilityOptions.ZOOKEEPER_CLIENT_AUTHORIZATION); +List authInfos = +authMap.entrySet().stream() +.map( +entry -> +new AuthInfo( +entry.getKey(), +entry.getValue() +.getBytes( + ConfigConstants + .DEFAULT_CHARSET))) +.collect(Collectors.toList()); +curatorFrameworkBuilder.authorization(authInfos); +} + +if (configuration.contains(HighAvailabilityOptions.ZOOKEEPER_MAX_CLOSE_WAIT)) { +long maxCloseWait = configuration.get(HighAvailabilityOptions.ZOOKEEPER_MAX_CLOSE_WAIT).toMillis(); +if (maxCloseWait < Integer.MIN_VALUE || maxCloseWait > Integer.MAX_VALUE) { +throw new IllegalConfigurationException(HighAvailabilityOptions.ZOOKEEPER_MAX_CLOSE_WAIT.key() + " in ms is not an integer - " + maxCloseWait); Review Comment: ```suggestion if (maxCloseWait < 0 || maxCloseWait > Integer.MAX_VALUE) { throw new IllegalConfigurationException( "The value (%d ms) is out-of-range for %s. The milliseconds timeout is expected to be between 0 and %d ms.", maxCloseWait, HighAvailabilityOptions.ZOOKEEPER_MAX_CLOSE_WAIT.key(), Integer.MAX_VALUE); } ``` nit: since tthere is a constructor for formatted strings. ## flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java: ## @@ -246,6 +249,38 @@ public static CuratorFrameworkWithUnhandledErrorListener startCuratorFramework( .ensembleTracker(ensembleTracking) .aclProvider(aclProvider); +if (configuration.contains(HighAvailabilityOptions.ZOOKEEPER_CLIENT_AUTHORIZATION)) { +Map authMap = + configuration.get(HighAvailabilityOptions.ZOOKEEPER_CLIENT_AUTHORIZATION); +List authInfos = +authMap.entrySet().stream() +.map( +entry -> +
Re: [PR] [FLINK-33970][jdbc][docs] Remove dead link [flink-connector-jdbc]
GOODBOY008 commented on PR #88: URL: https://github.com/apache/flink-connector-jdbc/pull/88#issuecomment-2022772500 @MartijnVisser PTAL , Thank you~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34934] Translation Flink-Kubernetes-Operator document framework construction [flink-kubernetes-operator]
caicancai commented on PR #805: URL: https://github.com/apache/flink-kubernetes-operator/pull/805#issuecomment-2022758286 @1996fanrui done ![2024-03-27 21-24-15屏幕截图](https://github.com/apache/flink-kubernetes-operator/assets/77189278/c2973efd-a1af-4fc4-afbb-08f53c7fdbeb) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34947] Only scale down JM in Foreground deletion propagation and reduce timeout [flink-kubernetes-operator]
gyfora commented on code in PR #806: URL: https://github.com/apache/flink-kubernetes-operator/pull/806#discussion_r1541081443 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java: ## @@ -306,34 +310,42 @@ protected Map getVertexResources( } /** - * Scale JM deployment to zero to gracefully stop all JM instances before any TMs are stopped. - * This avoids race conditions between JM shutdown and TM shutdown / failure handling. + * Shut down JobManagers gracefully by scaling JM deployment to zero. This avoids race + * conditions between JM shutdown and TM shutdown / failure handling. * * @param jmDeployment * @param namespace * @param clusterId - * @param timeout + * @param remainingTimeout * @return Remaining timeout after the operation. */ -private Duration scaleJmToZeroBlocking( +private Duration shutdownJobManagers( EditReplacePatchable jmDeployment, String namespace, String clusterId, -Duration timeout) { -return deleteBlocking( -"Scaling JobManager Deployment to zero", -() -> { -try { - jmDeployment.patch(PatchContext.of(PatchType.JSON_MERGE), SCALE_TO_ZERO); -} catch (Exception ignore) { -// Ignore all errors here as this is an optional step -return null; -} -return kubernetesClient -.pods() -.inNamespace(namespace) - .withLabels(KubernetesUtils.getJobManagerSelectors(clusterId)); -}, -timeout); +Duration remainingTimeout) { + +// We use only half of the shutdown timeout but at most one minute as the main point +// here is to initiate JM shutdown before the TMs +var jmShutdownTimeout = +ObjectUtils.min(JM_SHUTDOWN_MAX_WAIT, remainingTimeout.dividedBy(2)); Review Comment: The reason why clean blocking deletion is always preferred is due to issues like https://issues.apache.org/jira/browse/FLINK-32334 . It is always better to wait for the deployments/pods to go away before doing anything otherwise strange things can happen, like JMs/TMs reconnecting to new clusters etc -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34947] Only scale down JM in Foreground deletion propagation and reduce timeout [flink-kubernetes-operator]
gyfora commented on code in PR #806: URL: https://github.com/apache/flink-kubernetes-operator/pull/806#discussion_r1541074102 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java: ## @@ -306,34 +310,42 @@ protected Map getVertexResources( } /** - * Scale JM deployment to zero to gracefully stop all JM instances before any TMs are stopped. - * This avoids race conditions between JM shutdown and TM shutdown / failure handling. + * Shut down JobManagers gracefully by scaling JM deployment to zero. This avoids race + * conditions between JM shutdown and TM shutdown / failure handling. * * @param jmDeployment * @param namespace * @param clusterId - * @param timeout + * @param remainingTimeout * @return Remaining timeout after the operation. */ -private Duration scaleJmToZeroBlocking( +private Duration shutdownJobManagers( EditReplacePatchable jmDeployment, String namespace, String clusterId, -Duration timeout) { -return deleteBlocking( -"Scaling JobManager Deployment to zero", -() -> { -try { - jmDeployment.patch(PatchContext.of(PatchType.JSON_MERGE), SCALE_TO_ZERO); -} catch (Exception ignore) { -// Ignore all errors here as this is an optional step -return null; -} -return kubernetesClient -.pods() -.inNamespace(namespace) - .withLabels(KubernetesUtils.getJobManagerSelectors(clusterId)); -}, -timeout); +Duration remainingTimeout) { + +// We use only half of the shutdown timeout but at most one minute as the main point +// here is to initiate JM shutdown before the TMs +var jmShutdownTimeout = +ObjectUtils.min(JM_SHUTDOWN_MAX_WAIT, remainingTimeout.dividedBy(2)); Review Comment: This is not really user facing logic in the sense that the only thing that user cares about is a clean shutdown within the configured total shutdown timeout. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33376][coordination] Extend ZooKeeper Curator configurations [flink]
JTaky commented on code in PR #24563: URL: https://github.com/apache/flink/pull/24563#discussion_r1541075870 ## flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java: ## @@ -246,6 +249,38 @@ public static CuratorFrameworkWithUnhandledErrorListener startCuratorFramework( .ensembleTracker(ensembleTracking) .aclProvider(aclProvider); +if (configuration.contains(HighAvailabilityOptions.ZOOKEEPER_CLIENT_AUTHORIZATION)) { +Map authMap = + configuration.get(HighAvailabilityOptions.ZOOKEEPER_CLIENT_AUTHORIZATION); +List authInfos = +authMap.entrySet().stream() +.map( +entry -> +new AuthInfo( +entry.getKey(), +entry.getValue() +.getBytes( + ConfigConstants + .DEFAULT_CHARSET))) +.collect(Collectors.toList()); +curatorFrameworkBuilder.authorization(authInfos); +} + +if (configuration.contains(HighAvailabilityOptions.ZOOKEEPER_MAX_CLOSE_WAIT_MS)) { +curatorFrameworkBuilder.maxCloseWaitMs( +(int) +configuration + .get(HighAvailabilityOptions.ZOOKEEPER_MAX_CLOSE_WAIT_MS) +.toMillis()); Review Comment: I am hesitant to throw an Exception here, but probably it can help somebody who will put wrong duration value, e.g. days? (MaxInteger in ms is more than 500 hours) ``` if (maxCloseWait < Integer.MIN_VALUE || maxCloseWait > Integer.MAX_VALUE) { throw new IllegalConfigurationException(HighAvailabilityOptions.ZOOKEEPER_MAX_CLOSE_WAIT.key() + " in ms is not an integer - " + maxCloseWait); }``` As an alternative we could imagine to put a WARN and silently limit the value to the MaxInteger. While it is always a bad practice to make something implicit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34947] Only scale down JM in Foreground deletion propagation and reduce timeout [flink-kubernetes-operator]
gyfora commented on code in PR #806: URL: https://github.com/apache/flink-kubernetes-operator/pull/806#discussion_r1541072245 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java: ## @@ -306,34 +310,42 @@ protected Map getVertexResources( } /** - * Scale JM deployment to zero to gracefully stop all JM instances before any TMs are stopped. - * This avoids race conditions between JM shutdown and TM shutdown / failure handling. + * Shut down JobManagers gracefully by scaling JM deployment to zero. This avoids race + * conditions between JM shutdown and TM shutdown / failure handling. * * @param jmDeployment * @param namespace * @param clusterId - * @param timeout + * @param remainingTimeout * @return Remaining timeout after the operation. */ -private Duration scaleJmToZeroBlocking( +private Duration shutdownJobManagers( EditReplacePatchable jmDeployment, String namespace, String clusterId, -Duration timeout) { -return deleteBlocking( -"Scaling JobManager Deployment to zero", -() -> { -try { - jmDeployment.patch(PatchContext.of(PatchType.JSON_MERGE), SCALE_TO_ZERO); -} catch (Exception ignore) { -// Ignore all errors here as this is an optional step -return null; -} -return kubernetesClient -.pods() -.inNamespace(namespace) - .withLabels(KubernetesUtils.getJobManagerSelectors(clusterId)); -}, -timeout); +Duration remainingTimeout) { + +// We use only half of the shutdown timeout but at most one minute as the main point +// here is to initiate JM shutdown before the TMs +var jmShutdownTimeout = +ObjectUtils.min(JM_SHUTDOWN_MAX_WAIT, remainingTimeout.dividedBy(2)); Review Comment: I don't fully understand what you mean by non-blocking delete. What we gain here by shutting down the JM first is cleaner logs and generally faster shutdown for the TM pods which overall leads to more stable shutdown for the entire job. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34947] Only scale down JM in Foreground deletion propagation and reduce timeout [flink-kubernetes-operator]
mxm commented on code in PR #806: URL: https://github.com/apache/flink-kubernetes-operator/pull/806#discussion_r1541062926 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java: ## @@ -306,34 +310,42 @@ protected Map getVertexResources( } /** - * Scale JM deployment to zero to gracefully stop all JM instances before any TMs are stopped. - * This avoids race conditions between JM shutdown and TM shutdown / failure handling. + * Shut down JobManagers gracefully by scaling JM deployment to zero. This avoids race + * conditions between JM shutdown and TM shutdown / failure handling. * * @param jmDeployment * @param namespace * @param clusterId - * @param timeout + * @param remainingTimeout * @return Remaining timeout after the operation. */ -private Duration scaleJmToZeroBlocking( +private Duration shutdownJobManagers( EditReplacePatchable jmDeployment, String namespace, String clusterId, -Duration timeout) { -return deleteBlocking( -"Scaling JobManager Deployment to zero", -() -> { -try { - jmDeployment.patch(PatchContext.of(PatchType.JSON_MERGE), SCALE_TO_ZERO); -} catch (Exception ignore) { -// Ignore all errors here as this is an optional step -return null; -} -return kubernetesClient -.pods() -.inNamespace(namespace) - .withLabels(KubernetesUtils.getJobManagerSelectors(clusterId)); -}, -timeout); +Duration remainingTimeout) { + +// We use only half of the shutdown timeout but at most one minute as the main point +// here is to initiate JM shutdown before the TMs +var jmShutdownTimeout = +ObjectUtils.min(JM_SHUTDOWN_MAX_WAIT, remainingTimeout.dividedBy(2)); Review Comment: This is complicating the logic quite a bit and may be non-obvious to users. What do we really gain from deleting the JM deployment earlier? The TaskManager may be recycled quicker, but the JM shutdown speed isn't affected. ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java: ## @@ -306,34 +310,42 @@ protected Map getVertexResources( } /** - * Scale JM deployment to zero to gracefully stop all JM instances before any TMs are stopped. - * This avoids race conditions between JM shutdown and TM shutdown / failure handling. + * Shut down JobManagers gracefully by scaling JM deployment to zero. This avoids race + * conditions between JM shutdown and TM shutdown / failure handling. * * @param jmDeployment * @param namespace * @param clusterId - * @param timeout + * @param remainingTimeout * @return Remaining timeout after the operation. */ -private Duration scaleJmToZeroBlocking( +private Duration shutdownJobManagers( Review Comment: ```suggestion private Duration shutdownJobManagersBlocking( ``` Just to keep the general pattern, I think it's helpful to the reader. ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java: ## @@ -306,34 +310,42 @@ protected Map getVertexResources( } /** - * Scale JM deployment to zero to gracefully stop all JM instances before any TMs are stopped. - * This avoids race conditions between JM shutdown and TM shutdown / failure handling. + * Shut down JobManagers gracefully by scaling JM deployment to zero. This avoids race + * conditions between JM shutdown and TM shutdown / failure handling. * * @param jmDeployment * @param namespace * @param clusterId - * @param timeout + * @param remainingTimeout * @return Remaining timeout after the operation. */ -private Duration scaleJmToZeroBlocking( +private Duration shutdownJobManagers( EditReplacePatchable jmDeployment, String namespace, String clusterId, -Duration timeout) { -return deleteBlocking( -"Scaling JobManager Deployment to zero", -() -> { -try { - jmDeployment.patch(PatchContext.of(PatchType.JSON_MERGE), SCALE_TO_ZERO); -} catch (Exception ignore) { -// Ignore all errors here as this is an optional step -return null; -} -return kubernetesClient -.pods() -
[jira] [Comment Edited] (FLINK-34670) The asyncOperationsThreadPool in SubtaskCheckpointCoordinatorImpl can only create one worker thread
[ https://issues.apache.org/jira/browse/FLINK-34670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831334#comment-17831334 ] Jinzhong Li edited comment on FLINK-34670 at 3/27/24 12:57 PM: --- [~roman] [~pnowojski] I think this is a critical bug that will result in concurrent checkpoints being forced to execute sequentially, as well as causing a drastic performance regression of checkpoint aborts. Could you please help confirm this problem? was (Author: lijinzhong): [~roman] [~pnowojski] I think this is a critical bug that will result in concurrent checkpoints being forced to execute sequentially, as well as causing a drastic performance regression of checkpoint aborts. Could you please take a look at it? > The asyncOperationsThreadPool in SubtaskCheckpointCoordinatorImpl can only > create one worker thread > --- > > Key: FLINK-34670 > URL: https://issues.apache.org/jira/browse/FLINK-34670 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends >Affects Versions: 1.18.0, 1.19.0 >Reporter: Jinzhong Li >Priority: Critical > Labels: pull-request-available > Fix For: 1.20.0 > > Attachments: image-2024-03-14-20-24-14-198.png, > image-2024-03-14-20-27-37-540.png, image-2024-03-14-20-33-28-851.png > > > Now, the asyncOperations ThreadPoolExecutor of > SubtaskCheckpointCoordinatorImpl is create with a LinkedBlockingQueue and > zero corePoolSize. > !image-2024-03-14-20-24-14-198.png|width=614,height=198! > And in the ThreadPoolExecutor, except for the first time the task is > submitted, *no* new thread is created until the queue is full. But the > capacity of LinkedBlockingQueue is Integer.Max. This means that there is > almost *only one thread* working in this thread pool, *even if* {*}there are > many concurrent checkpoint requests or checkpoint abort requests waiting to > be processed{*}. > !image-2024-03-14-20-27-37-540.png|width=614,height=175! > This problem can be verified by changing ExecutorService implementation in UT > SubtaskCheckpointCoordinatorTest#testNotifyCheckpointAbortedDuringAsyncPhase. > When the LinkedBlockingQueue and zero corePoolSize are configured, this UT > will deadlock because only one worker thread can be created. > !image-2024-03-14-20-33-28-851.png|width=606,height=235! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34565) Enhance flink kubernetes configMap to accommodate additional configuration files
[ https://issues.apache.org/jira/browse/FLINK-34565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831336#comment-17831336 ] Zhu Zhu commented on FLINK-34565: - IIUC, the requirement is to ship more user files, which may be needed by user code, to the pod. Supporting configuration files is just a special case of it. Shipping them via ConfigMap sounds a bit tricky to me. cc [~wangyang0918] > Enhance flink kubernetes configMap to accommodate additional configuration > files > > > Key: FLINK-34565 > URL: https://issues.apache.org/jira/browse/FLINK-34565 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes >Reporter: Surendra Singh Lilhore >Priority: Major > Labels: pull-request-available > > Flink kubernetes client currently supports a fixed number of files > (flink-conf.yaml, logback-console.xml, log4j-console.properties) in the JM > and TM Pod ConfigMap. In certain scenarios, particularly in app mode, > additional configuration files are required for jobs to run successfully. > Presently, users must resort to workarounds to include dynamic configuration > files in the JM and TM. This proposed improvement allows users to easily add > extra files by configuring the > '{*}kubernetes.flink.configmap.additional.resources{*}' property. Users can > provide a semicolon-separated list of local files in the client Flink config > directory that should be included in the Flink ConfigMap. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33982][core] Introduce new config options for Batch Job Recovery [flink]
JunRuiLee commented on PR #24026: URL: https://github.com/apache/flink/pull/24026#issuecomment-2022702614 @zhuzhurk Thanks for your review, I've updated this pr accordingly, PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34670) The asyncOperationsThreadPool in SubtaskCheckpointCoordinatorImpl can only create one worker thread
[ https://issues.apache.org/jira/browse/FLINK-34670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831334#comment-17831334 ] Jinzhong Li commented on FLINK-34670: - [~roman] [~pnowojski] I think this is a critical bug that will result in concurrent checkpoints being forced to execute sequentially, as well as causing a drastic performance regression of checkpoint aborts. Could you please take a look at it? > The asyncOperationsThreadPool in SubtaskCheckpointCoordinatorImpl can only > create one worker thread > --- > > Key: FLINK-34670 > URL: https://issues.apache.org/jira/browse/FLINK-34670 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends >Affects Versions: 1.18.0, 1.19.0 >Reporter: Jinzhong Li >Priority: Critical > Labels: pull-request-available > Fix For: 1.20.0 > > Attachments: image-2024-03-14-20-24-14-198.png, > image-2024-03-14-20-27-37-540.png, image-2024-03-14-20-33-28-851.png > > > Now, the asyncOperations ThreadPoolExecutor of > SubtaskCheckpointCoordinatorImpl is create with a LinkedBlockingQueue and > zero corePoolSize. > !image-2024-03-14-20-24-14-198.png|width=614,height=198! > And in the ThreadPoolExecutor, except for the first time the task is > submitted, *no* new thread is created until the queue is full. But the > capacity of LinkedBlockingQueue is Integer.Max. This means that there is > almost *only one thread* working in this thread pool, *even if* {*}there are > many concurrent checkpoint requests or checkpoint abort requests waiting to > be processed{*}. > !image-2024-03-14-20-27-37-540.png|width=614,height=175! > This problem can be verified by changing ExecutorService implementation in UT > SubtaskCheckpointCoordinatorTest#testNotifyCheckpointAbortedDuringAsyncPhase. > When the LinkedBlockingQueue and zero corePoolSize are configured, this UT > will deadlock because only one worker thread can be created. > !image-2024-03-14-20-33-28-851.png|width=606,height=235! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34906] Only scale when all tasks are running [flink-kubernetes-operator]
mxm commented on PR #801: URL: https://github.com/apache/flink-kubernetes-operator/pull/801#issuecomment-2022693008 Thanks Rui! The changes make sense to me. To Gyulas point, I think we should try to deduplicate the logic such that both Kubernetes autoscaler and standalone use the same code path. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34444] Initial implementation of JM operator metric rest api [flink]
mxm commented on code in PR #24564: URL: https://github.com/apache/flink/pull/24564#discussion_r1541043451 ## flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java: ## @@ -538,19 +556,19 @@ public TaskMetricStore getTaskMetricStore(String taskID) { @ThreadSafe public static class TaskMetricStore extends ComponentMetricStore { private final Map subtasks; -private final Map jmOperators; +private final ComponentMetricStore jmOperator; Review Comment: After reading the rest of the code, this seems to be the case. Retrieving all the coordinator metrics at once for a task seems reasonable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34444] Initial implementation of JM operator metric rest api [flink]
mxm commented on code in PR #24564: URL: https://github.com/apache/flink/pull/24564#discussion_r1541028016 ## flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java: ## @@ -277,11 +277,12 @@ public static Configuration forReporter(Configuration configuration, String repo * JobManager of an operator. */ public static final ConfigOption SCOPE_NAMING_JM_OPERATOR = -key("metrics.scope.jm-operator") +key("metrics.scope.coordinator") Review Comment: Do we want to move ahead with this change? If so, we should also change the naming internally. ## flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java: ## @@ -538,19 +556,19 @@ public TaskMetricStore getTaskMetricStore(String taskID) { @ThreadSafe public static class TaskMetricStore extends ComponentMetricStore { private final Map subtasks; -private final Map jmOperators; +private final ComponentMetricStore jmOperator; Review Comment: Can't there be multiple coordinators per task? A task can host multiple operators, each with their own coordinator. Is the idea to coalesce all of them into one metric store? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org