[jira] [Assigned] (FLINK-34958) Add support Flink 1.20-SNAPSHOT and bump flink-connector-parent to 1.1.0 for mongodb connector

2024-03-27 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34958?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu reassigned FLINK-34958:
--

Assignee: Zhongqiang Gong

> Add support Flink 1.20-SNAPSHOT and bump flink-connector-parent to 1.1.0 for 
> mongodb connector
> --
>
> Key: FLINK-34958
> URL: https://issues.apache.org/jira/browse/FLINK-34958
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / MongoDB
>Reporter: Zhongqiang Gong
>Assignee: Zhongqiang Gong
>Priority: Minor
>
> Changes:
>  * Add support Flink 1.20-SNAPSHOT
>  * Bump flink-connector-parent to 1.1.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34958) Add support Flink 1.20-SNAPSHOT and bump flink-connector-parent to 1.1.0 for mongodb connector

2024-03-27 Thread Zhongqiang Gong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831632#comment-17831632
 ] 

Zhongqiang Gong commented on FLINK-34958:
-

[~Leonard] I'm willing to take this.

> Add support Flink 1.20-SNAPSHOT and bump flink-connector-parent to 1.1.0 for 
> mongodb connector
> --
>
> Key: FLINK-34958
> URL: https://issues.apache.org/jira/browse/FLINK-34958
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / MongoDB
>Reporter: Zhongqiang Gong
>Priority: Minor
>
> Changes:
>  * Add support Flink 1.20-SNAPSHOT
>  * Bump flink-connector-parent to 1.1.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34958) Add support Flink 1.20-SNAPSHOT and bump flink-connector-parent to 1.1.0 for mongodb connector

2024-03-27 Thread Zhongqiang Gong (Jira)
Zhongqiang Gong created FLINK-34958:
---

 Summary: Add support Flink 1.20-SNAPSHOT and bump 
flink-connector-parent to 1.1.0 for mongodb connector
 Key: FLINK-34958
 URL: https://issues.apache.org/jira/browse/FLINK-34958
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / MongoDB
Reporter: Zhongqiang Gong


Changes:
 * Add support Flink 1.20-SNAPSHOT
 * Bump flink-connector-parent to 1.1.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34934] Translation Flink-Kubernetes-Operator document framework construction [flink-kubernetes-operator]

2024-03-27 Thread via GitHub


caicancai commented on PR #805:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/805#issuecomment-2024447717

   > Thanks @caicancai for the contribution and test!
   > 
   > I run hugo locally, and try to choose English and Chinese doc, it works 
well.
   > 
   > But I noticed #807 is merged, but this PR doesn't include it for Chinese 
doc. It's needed to sync them from English doc to Chinese doc.
   > 
   > After that, this PR is fine for me.
   
   Thank you for the reminder, updated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34906] Only scale when all tasks are running [flink-kubernetes-operator]

2024-03-27 Thread via GitHub


gyfora commented on PR #801:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/801#issuecomment-2024422087

   sounds good @1996fanrui 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34924][table] Support partition pushdown for join queries [flink]

2024-03-27 Thread via GitHub


libenchao commented on PR #24559:
URL: https://github.com/apache/flink/pull/24559#issuecomment-2024407235

   It sounds like a good idea to do the optimization, I'll try to give it a 
review in the following days, thank you for the PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34656) Generated code for `ITEM` operator should return null when getting element of a null map/array/row

2024-03-27 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831623#comment-17831623
 ] 

Benchao Li commented on FLINK-34656:


[~nilerzhou] Thank you for the test case, it helps a lot, we indeed should fix 
it. The problem seems related to the type of 
{{ARRAY(BIGINT.notNull()).nullable()}}, although the inner type is {{NOT 
NULL}}, the outer type is {{NULLABLE}}, there is a similar discussion in 
FLINK-31830 (and a corresponding discussion thread : 
https://lists.apache.org/thread/fzrfc9c3rtgw761ofdydl0q96km558q7).

Since this is a codegen issue which does not affect the user interface, we can 
solve this directly.

> Generated code for `ITEM` operator should return null when getting element of 
> a null map/array/row
> --
>
> Key: FLINK-34656
> URL: https://issues.apache.org/jira/browse/FLINK-34656
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0
>Reporter: yisha zhou
>Priority: Major
>
> In FieldAccessFromTableITCase we can find that the expected result of f0[1] 
> is null when f0 is a null array. 
> However, behavior in generated code for ITEM is not consistent with case 
> above. The main code is:
>  
> {code:java}
> val arrayAccessCode =
>   s"""
>  |${array.code}
>  |${index.code}
>  |boolean $nullTerm = ${array.nullTerm} || ${index.nullTerm} ||
>  |   $idxStr < 0 || $idxStr >= ${array.resultTerm}.size() || $arrayIsNull;
>  |$resultTypeTerm $resultTerm = $nullTerm ? $defaultTerm : $arrayGet;
>  |""".stripMargin {code}
> If `array.nullTerm` is true, a default value of element type will be 
> returned, for example -1 for null bigint array.
> The reason why FieldAccessFromTableITCase can get expected result is that the 
> ReduceExpressionsRule generated an expression code for that case like:
> {code:java}
> boolean isNull$0 = true || false ||
>    ((int) 1) - 1 < 0 || ((int) 1) - 1 >= 
> ((org.apache.flink.table.data.ArrayData) null).size() || 
> ((org.apache.flink.table.data.ArrayData) null).isNullAt(((int) 1) - 1);
> long result$0 = isNull$0 ? -1L : ((org.apache.flink.table.data.ArrayData) 
> null).getLong(((int) 1) - 1);
> if (isNull$0) {
>   out.setField(0, null);
> } else {
>   out.setField(0, result$0);
> } {code}
> The reduced expr will be a null literal.
>  
> I think the behaviors for getting element of a null value should be unified.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34947] Only scale down JM in Foreground deletion propagation and reduce timeout [flink-kubernetes-operator]

2024-03-27 Thread via GitHub


gyfora commented on code in PR #806:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/806#discussion_r1542314747


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java:
##
@@ -306,34 +310,42 @@ protected Map 
getVertexResources(
 }
 
 /**
- * Scale JM deployment to zero to gracefully stop all JM instances before 
any TMs are stopped.
- * This avoids race conditions between JM shutdown and TM shutdown / 
failure handling.
+ * Shut down JobManagers gracefully by scaling JM deployment to zero. This 
avoids race
+ * conditions between JM shutdown and TM shutdown / failure handling.
  *
  * @param jmDeployment
  * @param namespace
  * @param clusterId
- * @param timeout
+ * @param remainingTimeout
  * @return Remaining timeout after the operation.
  */
-private Duration scaleJmToZeroBlocking(
+private Duration shutdownJobManagers(

Review Comment:
   I will change this, makes sense :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34529][table-planner] Introduce FlinkProjectWindowTransposeRule. [flink]

2024-03-27 Thread via GitHub


libenchao commented on code in PR #24567:
URL: https://github.com/apache/flink/pull/24567#discussion_r1542265389


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala:
##
@@ -186,6 +187,27 @@ class FlinkRelMdUpsertKeys private extends 
MetadataHandler[UpsertKeys] {
   }
 
   def getUpsertKeys(rel: Window, mq: RelMetadataQuery): JSet[ImmutableBitSet] 
= {
+// If it's a ROW_NUMBER rank, then the upsert keys are partition by key 
and order key.

Review Comment:
   the upsert keys should be : partition key + order key + rownumber column?



##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala:
##
@@ -186,6 +187,27 @@ class FlinkRelMdUpsertKeys private extends 
MetadataHandler[UpsertKeys] {
   }
 
   def getUpsertKeys(rel: Window, mq: RelMetadataQuery): JSet[ImmutableBitSet] 
= {

Review Comment:
   And since you changed `FlinkRelMdUniqueKeys`, it would be good to also add 
tests for it in `FlinkRelMdUniqueKeysTest`



##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/RankITCase.scala:
##
@@ -656,18 +656,14 @@ class RankITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
   "(true,1,book,b,3,2)",
   "(true,2,fruit,a,2,1)",
   "(true,3,book,a,1,2)",
-  "(true,3,book,a,1,2)",

Review Comment:
   Is this change expected?



##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala:
##
@@ -186,6 +187,27 @@ class FlinkRelMdUpsertKeys private extends 
MetadataHandler[UpsertKeys] {
   }
 
   def getUpsertKeys(rel: Window, mq: RelMetadataQuery): JSet[ImmutableBitSet] 
= {
+// If it's a ROW_NUMBER rank, then the upsert keys are partition by key 
and order key.
+if (rel.groups.length == 1) {

Review Comment:
   `Window` can have multiple groups, if any one of them contains `row_number`, 
it could be an upsert key condidate



##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala:
##
@@ -186,6 +187,27 @@ class FlinkRelMdUpsertKeys private extends 
MetadataHandler[UpsertKeys] {
   }
 
   def getUpsertKeys(rel: Window, mq: RelMetadataQuery): JSet[ImmutableBitSet] 
= {

Review Comment:
   It seems like a good improvement to both unique key and upsert key metadata, 
do you think it's possible to put it in `FlinkRelMdUniqueKeys` and reuse it in 
`FlinkRelMdUpsertKeys`, just like `FlinkRelMdUniqueKeys.getRankUniqueKeys`?



##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkProjectWindowTransposeRule.java:
##
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.rules.ProjectRemoveRule;
+import org.apache.calcite.rel.rules.TransformationRule;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.util.BitSets;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.immutables.value.Value;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * 

Re: [PR] [build] fix inconsistent Kafka shading among cdc connectors [flink-cdc]

2024-03-27 Thread via GitHub


link3280 commented on PR #2988:
URL: https://github.com/apache/flink-cdc/pull/2988#issuecomment-2024362933

   cc @PatrickRen 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-34957) JDBC Autoscaler event handler throws Column 'message' cannot be null

2024-03-27 Thread Rui Fan (Jira)
Rui Fan created FLINK-34957:
---

 Summary: JDBC Autoscaler event handler throws Column 'message' 
cannot be null 
 Key: FLINK-34957
 URL: https://issues.apache.org/jira/browse/FLINK-34957
 Project: Flink
  Issue Type: Bug
  Components: Autoscaler
Reporter: Rui Fan
Assignee: Rui Fan
 Fix For: kubernetes-operator-1.9.0
 Attachments: image-2024-03-28-11-57-35-234.png

JDBC Autoscaler event handler doesn't allow the event message is null, but the 
message may be null when we handle the exception.

We consider the exception message as the event message, but the exception 
message may be null, such as: TimeoutException. (It has been shown in following 
picture.)

Also, ecording a event without any message is meaningless. It doesn't have any 
benefit for troubleshooting.

Solution: 
* Consider the exception message as the event message when exception message 
isn't null
* The whole Exception as the event message if exception message is null.

 !image-2024-03-28-11-57-35-234.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34956][doc] Fix the config type wrong of Duration [flink]

2024-03-27 Thread via GitHub


flinkbot commented on PR #24581:
URL: https://github.com/apache/flink/pull/24581#issuecomment-2024335796

   
   ## CI report:
   
   * 62b4ebc169137a9d72e73481d8bcae51b5c2cc8b UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [pipeline-connector][paimon] add paimon pipeline data sink connector. [flink-cdc]

2024-03-27 Thread via GitHub


yanghuaiGit commented on PR #2916:
URL: https://github.com/apache/flink-cdc/pull/2916#issuecomment-2024335047

   paimon latest version is 0.7,we should update paimon version from 0.6 to 0.7


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34956) The config type is wrong for Duration

2024-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34956:
---
Labels: pull-request-available  (was: )

> The config type is wrong for Duration
> -
>
> Key: FLINK-34956
> URL: https://issues.apache.org/jira/browse/FLINK-34956
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
> Attachments: image-2024-03-28-11-21-31-802.png
>
>
> The Config type is Boolean, but it should be Duration.
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/
>  !image-2024-03-28-11-21-31-802.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-34956][doc] Fix the config type wrong of Duration [flink]

2024-03-27 Thread via GitHub


1996fanrui opened a new pull request, #24581:
URL: https://github.com/apache/flink/pull/24581

   ## What is the purpose of the change
   
   The Config type is Boolean, but it should be Duration.
   
   https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/
   
   
![image](https://github.com/apache/flink/assets/38427477/f1156c3e-e3e2-4d36-88a0-ed2976a598f6)
   
   
   ## Brief change log
   
   - [FLINK-34956][doc] Fix the config type wrong of Duration


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][cdc-connector][mysql] Fix NoClassDefFoundError when create new table in mysql cdc source [flink-cdc]

2024-03-27 Thread via GitHub


meicao2999 commented on PR #3036:
URL: https://github.com/apache/flink-cdc/pull/3036#issuecomment-2024328996

   I have also encountered Does NoClassDefFoundError. Is there a specific class


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-34956) The config type is wrong for Duration

2024-03-27 Thread Rui Fan (Jira)
Rui Fan created FLINK-34956:
---

 Summary: The config type is wrong for Duration
 Key: FLINK-34956
 URL: https://issues.apache.org/jira/browse/FLINK-34956
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Reporter: Rui Fan
Assignee: Rui Fan
 Fix For: 1.20.0
 Attachments: image-2024-03-28-11-21-31-802.png

The Config type is Boolean, but it should be Duration.

https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/

 !image-2024-03-28-11-21-31-802.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [cdc-cli]Add support for both new and old Flink config files in Flink… [flink-cdc]

2024-03-27 Thread via GitHub


skymilong commented on PR #3194:
URL: https://github.com/apache/flink-cdc/pull/3194#issuecomment-2024327508

   > @skymilong Thanks for the PR! According to the code contribution rule of 
Apache Flink, could you create an issue on 
[Jira](https://issues.apache.org/jira) for this and include the Jira ID in the 
PR title and commit message? You can take #3160 as an example.
   > 
   > And the PR has code formatting issue. Please run `mvn spotless:apply` 
before pushing your commits.
   > 
   > Thanks!
   
   Thanks a bunch for your help. This is my first rodeo contributing code on 
GitHub, so I'm still trying to get the hang of things. I'll whip up a Jira 
issue like you suggested and pop the ID into the PR and commit messages. Also, 
I'll iron out those code formatting kinks. I'll get on it pronto and get the PR 
updated.
   Appreciate your patience and all your help.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33985][runtime] Support obtain all partitions existing in cluster through ShuffleMaster. [flink]

2024-03-27 Thread via GitHub


zhuzhurk commented on code in PR #24553:
URL: https://github.com/apache/flink/pull/24553#discussion_r1542254332


##
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java:
##
@@ -62,7 +61,7 @@ public class NettyShuffleMaster implements 
ShuffleMaster
 
 @Nullable private final TieredInternalShuffleMaster 
tieredInternalShuffleMaster;
 
-private final Map jobMasters = new HashMap<>();
+private final Map jobMasterShuffleMasters = new 
HashMap<>();

Review Comment:
   I prefer to name it as `jobShuffleContexts`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [build] fix inconsistent Kafka shading among cdc connectors [flink-cdc]

2024-03-27 Thread via GitHub


link3280 commented on code in PR #2988:
URL: https://github.com/apache/flink-cdc/pull/2988#discussion_r1542247969


##
flink-cdc-connect/flink-cdc-source-connectors/flink-sql-connector-tidb-cdc/pom.xml:
##
@@ -60,6 +60,12 @@ under the License.
 
 
 
+
+org.apache.kafka
+
+
com.ververica.cdc.connectors.shaded.org.apache.kafka

Review Comment:
   The shaded pattern is updated. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34906] Only scale when all tasks are running [flink-kubernetes-operator]

2024-03-27 Thread via GitHub


1996fanrui commented on PR #801:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/801#issuecomment-2024295706

   > Thanks Rui! The changes make sense to me. To Gyulas point, I think we 
should try to deduplicate the logic such that both Kubernetes autoscaler and 
standalone use the same code path.
   
   I could move `JobStatusUtils` from `flink-autoscaler-standalone` module to 
`flink-autoscaler` module, then both of `flink-autoscaler-standalone` and 
`flink-kubernetes-operator` module can use it.
   
   Hey @gyfora , what do you think about it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [build] fix jackson conflicts among cdc connectors [flink-cdc]

2024-03-27 Thread via GitHub


link3280 commented on PR #2987:
URL: https://github.com/apache/flink-cdc/pull/2987#issuecomment-2024287943

   ping @leonardBang @PatrickRen 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [docs]Change host to host name in configuration for readme [flink-cdc]

2024-03-27 Thread via GitHub


cjj2010 commented on PR #3076:
URL: https://github.com/apache/flink-cdc/pull/3076#issuecomment-2024282059

   > Could you
   
   Complete rebase now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] (FLINK-34556) Migrate EnumerableToLogicalTableScan

2024-03-27 Thread Jacky Lau (Jira)


[ https://issues.apache.org/jira/browse/FLINK-34556 ]


Jacky Lau deleted comment on FLINK-34556:
---

was (Author: jackylau):
hi [~snuyanzin]  will you also help review this pr ?

> Migrate EnumerableToLogicalTableScan
> 
>
> Key: FLINK-34556
> URL: https://issues.apache.org/jira/browse/FLINK-34556
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0
>Reporter: Jacky Lau
>Assignee: Jacky Lau
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-03-27 Thread via GitHub


liuyongvs commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2024262258

   hi @MartijnVisser INTERSECT is different with array_intersect. instersect is 
a set RelNode, like union/union all, which indeed is a  SQL standard.. while 
array_intersect array_union are just functions, which doesn't have any standard


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34955] Upgrade commons-compress to 1.26.0. [flink]

2024-03-27 Thread via GitHub


flinkbot commented on PR #24580:
URL: https://github.com/apache/flink/pull/24580#issuecomment-2024252852

   
   ## CI report:
   
   * 9741745f8b3a6395e4ab4cc530d11d813addce9f UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34955) Upgrade commons-compress to 1.26.0

2024-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34955:
---
Labels: pull-request-available  (was: )

> Upgrade commons-compress to 1.26.0
> --
>
> Key: FLINK-34955
> URL: https://issues.apache.org/jira/browse/FLINK-34955
> Project: Flink
>  Issue Type: Improvement
>Reporter: Shilun Fan
>Priority: Major
>  Labels: pull-request-available
>
> commons-compress 1.24.0 has CVE issues, try to upgrade to 1.26.0, we can 
> refer to the maven link
> https://mvnrepository.com/artifact/org.apache.commons/commons-compress



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-34955] Upgrade commons-compress to 1.26.0. [flink]

2024-03-27 Thread via GitHub


slfan1989 opened a new pull request, #24580:
URL: https://github.com/apache/flink/pull/24580

   
   
   ## What is the purpose of the change
   
   JIRA: [FLINK-34955] Upgrade commons-compress to 1.26.0.
   
   commons-compress 1.24.0 has CVE issues, try to upgrade to 1.26.0, we can 
refer to the maven link
   
   https://mvnrepository.com/artifact/org.apache.commons/commons-compress
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-34955) Upgrade commons-compress to 1.26.0

2024-03-27 Thread Shilun Fan (Jira)
Shilun Fan created FLINK-34955:
--

 Summary: Upgrade commons-compress to 1.26.0
 Key: FLINK-34955
 URL: https://issues.apache.org/jira/browse/FLINK-34955
 Project: Flink
  Issue Type: Improvement
Reporter: Shilun Fan


commons-compress 1.24.0 has CVE issues, try to upgrade to 1.26.0, we can refer 
to the maven link

https://mvnrepository.com/artifact/org.apache.commons/commons-compress



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34943) Support Flink 1.19, 1.20-SNAPSHOT for JDBC connector

2024-03-27 Thread Zhongqiang Gong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhongqiang Gong closed FLINK-34943.
---
Resolution: Fixed

Fixed by https://github.com/apache/flink-connector-jdbc/pull/107

> Support Flink 1.19, 1.20-SNAPSHOT for JDBC connector
> 
>
> Key: FLINK-34943
> URL: https://issues.apache.org/jira/browse/FLINK-34943
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC
>Reporter: Zhongqiang Gong
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] FLINK-31361 - Added the right shaded dependency for SQL client connector for kafka - Updated kafka.md [flink-connector-kafka]

2024-03-27 Thread via GitHub


alpinegizmo commented on code in PR #88:
URL: 
https://github.com/apache/flink-connector-kafka/pull/88#discussion_r1542159853


##
docs/content/docs/connectors/table/kafka.md:
##
@@ -674,7 +674,7 @@ Please note that the class path of the login module in 
`sasl.jaas.config` might
 client dependencies, so you may need to rewrite it with the actual class path 
of the module in the JAR.
 SQL client JAR has relocated Kafka client dependencies to 
`org.apache.flink.kafka.shaded.org.apache.kafka`,
 then the path of plain login module in code snippets above need to be
-`org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule`
 when using SQL client JAR.
+`org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule`
 when using SQL client JAR(flink-sql-connector-kafka-x.xx.x.jar).

Review Comment:
   ```suggestion
   
`org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule`
 when using SQL client JAR (flink-sql-connector-kafka-x.xx.x.jar).
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Bump org.elasticsearch:elasticsearch from 7.10.2 to 7.17.19 in /flink-connector-elasticsearch-base [flink-connector-elasticsearch]

2024-03-27 Thread via GitHub


dependabot[bot] opened a new pull request, #95:
URL: https://github.com/apache/flink-connector-elasticsearch/pull/95

   
   
   [![Dependabot compatibility 
score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=org.elasticsearch:elasticsearch=maven=7.10.2=7.17.19)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)
   
   Dependabot will resolve any conflicts with this PR as long as you don't 
alter it yourself. You can also trigger a rebase manually by commenting 
`@dependabot rebase`.
   
   [//]: # (dependabot-automerge-start)
   [//]: # (dependabot-automerge-end)
   
   ---
   
   
   Dependabot commands and options
   
   
   You can trigger Dependabot actions by commenting on this PR:
   - `@dependabot rebase` will rebase this PR
   - `@dependabot recreate` will recreate this PR, overwriting any edits that 
have been made to it
   - `@dependabot merge` will merge this PR after your CI passes on it
   - `@dependabot squash and merge` will squash and merge this PR after your CI 
passes on it
   - `@dependabot cancel merge` will cancel a previously requested merge and 
block automerging
   - `@dependabot reopen` will reopen this PR if it is closed
   - `@dependabot close` will close this PR and stop Dependabot recreating it. 
You can achieve the same result by closing it manually
   - `@dependabot show  ignore conditions` will show all of 
the ignore conditions of the specified dependency
   - `@dependabot ignore this major version` will close this PR and stop 
Dependabot creating any more for this major version (unless you reopen the PR 
or upgrade to it yourself)
   - `@dependabot ignore this minor version` will close this PR and stop 
Dependabot creating any more for this minor version (unless you reopen the PR 
or upgrade to it yourself)
   - `@dependabot ignore this dependency` will close this PR and stop 
Dependabot creating any more for this dependency (unless you reopen the PR or 
upgrade to it yourself)
   You can disable automated security fix PRs for this repo from the [Security 
Alerts 
page](https://github.com/apache/flink-connector-elasticsearch/network/alerts).
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31497][table] Drop the deprecated CatalogViewImpl [flink]

2024-03-27 Thread via GitHub


flinkbot commented on PR #24579:
URL: https://github.com/apache/flink/pull/24579#issuecomment-2023946346

   
   ## CI report:
   
   * e5666b868012f919d639486030aa0bc5a5f25142 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31497) Drop the deprecated CatalogViewImpl

2024-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31497:
---
Labels: pull-request-available  (was: )

> Drop the deprecated CatalogViewImpl 
> 
>
> Key: FLINK-31497
> URL: https://issues.apache.org/jira/browse/FLINK-31497
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: WenJun Min
>Priority: Major
>  Labels: pull-request-available
>
> After https://issues.apache.org/jira/browse/FLINK-29585
> CatalogViewImpl not used in Flink project now, we may can drop it now cc 
> [~snuyanzin]
> But, we may have to check whether it is used in other connector's system



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-31497][table] Drop the deprecated CatalogViewImpl [flink]

2024-03-27 Thread via GitHub


jeyhunkarimov opened a new pull request, #24579:
URL: https://github.com/apache/flink/pull/24579

   ## What is the purpose of the change
   
   Drop the deprecated CatalogViewImpl
   
   ## Brief change log
   
 - Refactor documentation
 - Remove CatalogViewImpl
 - Refactor python code that uses CatalogViewImpl
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33676] Implement RestoreTests for WindowAggregate [flink]

2024-03-27 Thread via GitHub


jnh5y commented on PR #23886:
URL: https://github.com/apache/flink/pull/23886#issuecomment-2023874164

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33805] Implement restore tests for OverAggregate node [flink]

2024-03-27 Thread via GitHub


jnh5y commented on code in PR #24565:
URL: https://github.com/apache/flink/pull/24565#discussion_r1541935341


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateTestPrograms.java:
##
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+import static 
org.apache.flink.table.api.config.TableConfigOptions.LOCAL_TIME_ZONE;
+
+/** {@link TableTestProgram} definitions for testing {@link 
StreamExecOverAggregate}. */
+public class OverAggregateTestPrograms {
+
+static final Row[] DATA = {

Review Comment:
   Yes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34538][docs] Add Autotuning documentation [flink-kubernetes-operator]

2024-03-27 Thread via GitHub


mxm merged PR #807:
URL: https://github.com/apache/flink-kubernetes-operator/pull/807


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34538][docs] Add Autotuning documentation [flink-kubernetes-operator]

2024-03-27 Thread via GitHub


mxm commented on PR #807:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/807#issuecomment-2023796047

   Thanks for the quick review. @1996fanrui Let me know if you have any 
comments. I'll address them post-merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33676] Implement RestoreTests for WindowAggregate [flink]

2024-03-27 Thread via GitHub


jnh5y commented on code in PR #23886:
URL: https://github.com/apache/flink/pull/23886#discussion_r1541903259


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest.java:
##
@@ -1,528 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
-
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.config.OptimizerConfigOptions;
-import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.ConcatDistinctAggFunction;
-import org.apache.flink.table.planner.utils.StreamTableTestUtil;
-import org.apache.flink.table.planner.utils.TableTestBase;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-/** Test json serialization/deserialization for window aggregate. */
-class WindowAggregateJsonPlanTest extends TableTestBase {
-
-private StreamTableTestUtil util;
-private TableEnvironment tEnv;
-
-@BeforeEach
-void setup() {
-util = streamTestUtil(TableConfig.getDefault());
-tEnv = util.getTableEnv();
-
-String insertOnlyTableDdl =
-"CREATE TABLE MyTable (\n"
-+ " a INT,\n"
-+ " b BIGINT,\n"
-+ " c VARCHAR,\n"
-+ " `rowtime` AS TO_TIMESTAMP(c),\n"
-+ " proctime as PROCTIME(),\n"
-+ " WATERMARK for `rowtime` AS `rowtime` - INTERVAL 
'1' SECOND\n"
-+ ") WITH (\n"
-+ " 'connector' = 'values')\n";
-tEnv.executeSql(insertOnlyTableDdl);
-
-String changelogTableDdl =
-"CREATE TABLE MyCDCTable (\n"
-+ " a INT,\n"
-+ " b BIGINT,\n"
-+ " c VARCHAR,\n"
-+ " `rowtime` AS TO_TIMESTAMP(c),\n"
-+ " proctime as PROCTIME(),\n"
-+ " WATERMARK for `rowtime` AS `rowtime` - INTERVAL 
'1' SECOND\n"
-+ ") WITH (\n"
-+ " 'connector' = 'values',\n"
-+ " 'changelog-mode' = 'I,UA,UB,D')\n";
-tEnv.executeSql(changelogTableDdl);
-}
-
-@Test
-void testEventTimeTumbleWindow() {
-tEnv.createFunction("concat_distinct_agg", 
ConcatDistinctAggFunction.class);
-String sinkTableDdl =
-"CREATE TABLE MySink (\n"
-+ " b BIGINT,\n"
-+ " window_start TIMESTAMP(3),\n"
-+ " window_end TIMESTAMP(3),\n"
-+ " cnt BIGINT,\n"
-+ " sum_a INT,\n"
-+ " distinct_cnt BIGINT,\n"
-+ " concat_distinct STRING\n"
-+ ") WITH (\n"
-+ " 'connector' = 'values')\n";
-tEnv.executeSql(sinkTableDdl);
-util.verifyJsonPlan(
-"insert into MySink select\n"
-+ "  b,\n"
-+ "  window_start,\n"
-+ "  window_end,\n"
-+ "  COUNT(*),\n"
-+ "  SUM(a),\n"
-+ "  COUNT(DISTINCT c),\n"
-+ "  concat_distinct_agg(c)\n"
-+ "FROM TABLE(\n"
-+ "   TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), 
INTERVAL '5' SECOND))\n"
-+ "GROUP BY b, window_start, window_end");
-}
-
-@Test
-void testEventTimeTumbleWindowWithCDCSource() {
-tEnv.createFunction("concat_distinct_agg", 
ConcatDistinctAggFunction.class);
-String sinkTableDdl =
-"CREATE TABLE MySink (\n"
-+ " b BIGINT,\n"
-+ " window_start TIMESTAMP(3),\n"
-+ " window_end TIMESTAMP(3),\n"
-+ " cnt BIGINT,\n"
-+ " sum_a INT,\n"
- 

[jira] [Updated] (FLINK-34954) Kryo input implementation NoFetchingInput fails to handle zero length bytes

2024-03-27 Thread Qinghui Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qinghui Xu updated FLINK-34954:
---
Description: 
If the serailized bytes are empty, `NoFetchingInput` will run into error when 
Kryo tries to deserialize it.

Example: a protobuf 3 object that contains only default values will be 
serialized as 0 length byte array, and the deserialization later will fail. 
Illustration:
{noformat}
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.{ByteBufferInput, ByteBufferOutput, Input, 
Output}
import com.google.protobuf.{DescriptorProtos, Message}import 
com.twitter.chill.protobuf.ProtobufSerializer
import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
import java.io.ByteArrayInputStream
 
object ProtoSerializationTest {
  def main(args: Array[String]) = {     
val chillProtoSerializer = new ProtobufSerializer
    val protomessage = DescriptorProtos.DescriptorProto.getDefaultInstance
    val output: Output = new ByteBufferOutput(1000)
    chillProtoSerializer.write(null, output, protomessage)
    val serialized: Array[Byte] = output.toBytes
    println(s"Serialized : $serialized")
    val input: Input = new NoFetchingInput(new ByteArrayInputStream(serialized))
    val deserialized = chillProtoSerializer.read(null, input, 
classOf[BillableClick].asInstanceOf[Class[Message]])
    println(deserialized)
  }
}
{noformat}
 

Error
{noformat}
Exception in thread "main" java.lang.RuntimeException: Could not create class 
com.criteo.glup.BillableClickProto$BillableClick
    at 
com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76)
    at 
com.criteo.streaming.common.bootstrap.ProtoSerialization$.main(ProtoSerialization.scala:22)
    at ProtoSerialization.main(ProtoSerialization.scala)
Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No 
more bytes left.
    at 
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128)
    at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:332)
    at 
com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
    ... 2 more
Caused by: java.io.EOFException: No more bytes left.
    ... 5 more{noformat}

  was:
If the serailized bytes are empty, `NoFetchingInput` will run into error when 
Kryo tries to deserialize it.

Example: a protobuf 3 object that contains only default values will be 
serialized as 0 length byte array, and the deserialization later will fail. 
Illustration:
```
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.\{ByteBufferInput, ByteBufferOutput, Input, 
Output}

import com.google.protobuf.\{DescriptorProtos, Message}import 
com.twitter.chill.protobuf.ProtobufSerializer
import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput

import java.io.ByteArrayInputStream

object ProtoSerializationTest {

  def main(args: Array[String]) = {
    val chillProtoSerializer = new ProtobufSerializer

    val protomessage = DescriptorProtos.DescriptorProto.getDefaultInstance    
val output: Output = new ByteBufferOutput(1000)
    chillProtoSerializer.write(null, output, protomessage)
    val serialized: Array[Byte] = output.toBytes
    println(s"Serialized : $serialized")
    val input: Input = new NoFetchingInput(new ByteArrayInputStream(serialized))
    val deserialized = chillProtoSerializer.read(null, input, 
classOf[BillableClick].asInstanceOf[Class[Message]])
    println(deserialized)
  }
}
```

Error
```

Exception in thread "main" java.lang.RuntimeException: Could not create class 
com.criteo.glup.BillableClickProto$BillableClick
    at 
com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76)
    at 
com.criteo.streaming.common.bootstrap.ProtoSerialization$.main(ProtoSerialization.scala:22)
    at ProtoSerialization.main(ProtoSerialization.scala)
Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No 
more bytes left.
    at 
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128)
    at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:332)
    at 
com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
    ... 2 more
Caused by: java.io.EOFException: No more bytes left.
    ... 5 more

```


> Kryo input implementation NoFetchingInput fails to handle zero length bytes
> ---
>
> Key: FLINK-34954
> URL: https://issues.apache.org/jira/browse/FLINK-34954
> Project: Flink
>  Issue Type: Bug
>Reporter: Qinghui Xu
>Priority: Major
>
> If the serailized bytes are empty, `NoFetchingInput` will run into error when 
> Kryo tries to deserialize it.
> Example: a protobuf 3 object that contains only default values will be 
> serialized as 0 

Re: [PR] [FLINK-33676] Implement RestoreTests for WindowAggregate [flink]

2024-03-27 Thread via GitHub


jnh5y commented on code in PR #23886:
URL: https://github.com/apache/flink/pull/23886#discussion_r1541901567


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateTestPrograms.java:
##
@@ -0,0 +1,528 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.utils.AggregatePhaseStrategy;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+import java.math.BigDecimal;
+import java.util.function.Function;
+
+/** {@link TableTestProgram} definitions for testing {@link 
StreamExecWindowAggregate}. */
+public class WindowAggregateTestPrograms {
+
+static final Row[] BEFORE_DATA = {
+Row.of("2020-10-10 00:00:01", 1, 1d, 1f, new BigDecimal("1.11"), "Hi", 
"a"),
+Row.of("2020-10-10 00:00:02", 2, 2d, 2f, new BigDecimal("2.22"), 
"Comment#1", "a"),
+Row.of("2020-10-10 00:00:03", 2, 2d, 2f, new BigDecimal("2.22"), 
"Comment#1", "a"),
+Row.of("2020-10-10 00:00:04", 5, 5d, 5f, new BigDecimal("5.55"), null, 
"a"),
+Row.of("2020-10-10 00:00:07", 3, 3d, 3f, null, "Hello", "b"),
+// out of order
+Row.of("2020-10-10 00:00:06", 6, 6d, 6f, new BigDecimal("6.66"), "Hi", 
"b"),
+Row.of("2020-10-10 00:00:08", 3, null, 3f, new BigDecimal("3.33"), 
"Comment#2", "a"),
+// late event
+Row.of("2020-10-10 00:00:04", 5, 5d, null, new BigDecimal("5.55"), 
"Hi", "a"),
+Row.of("2020-10-10 00:00:16", 4, 4d, 4f, new BigDecimal("4.44"), "Hi", 
"b"),
+Row.of("2020-10-10 00:00:32", 7, 7d, 7f, new BigDecimal("7.77"), null, 
null),
+Row.of("2020-10-10 00:00:34", 1, 3d, 3f, new BigDecimal("3.33"), 
"Comment#3", "b")
+};
+
+static final Row[] AFTER_DATA = {
+Row.of("2020-10-10 00:00:40", 10, 3d, 3f, new BigDecimal("4.44"), 
"Comment#4", "a"),
+Row.of("2020-10-10 00:00:42", 11, 4d, 4f, new BigDecimal("5.44"), 
"Comment#5", "d"),
+Row.of("2020-10-10 00:00:43", 12, 5d, 5f, new BigDecimal("6.44"), 
"Comment#6", "c"),
+Row.of("2020-10-10 00:00:44", 13, 6d, 6f, new BigDecimal("7.44"), 
"Comment#7", "d")
+};
+
+static final Function SOURCE_BUILDER =
+str ->
+SourceTestStep.newBuilder(str)
+.addSchema(
+"ts STRING",
+"a_int INT",
+"b_double DOUBLE",
+"c_float FLOAT",
+"d_bigdec DECIMAL(10, 2)",
+"`comment` STRING",
+"name STRING",
+"`rowtime` AS TO_TIMESTAMP(`ts`)",
+"`proctime` AS PROCTIME()",
+"WATERMARK for `rowtime` AS `rowtime` - 
INTERVAL '1' SECOND")
+.addOption("changelog-mode", "I,UA,UB,D")
+.producedBeforeRestore(BEFORE_DATA)
+.producedAfterRestore(AFTER_DATA);
+static final SourceTestStep SOURCE = 
SOURCE_BUILDER.apply("window_source_t").build();
+
+static final SourceTestStep CDC_SOURCE =
+SOURCE_BUILDER
+.apply("cdc_window_source_t")
+.addOption("changelog-mode", "I,UA,UB,D")
+.build();
+
+static final String[] TUMBLE_EVENT_TIME_BEFORE_ROWS = {
+"+I[a, 2020-10-10T00:00, 2020-10-10T00:00:05, 4, 10, 2]",
+"+I[a, 2020-10-10T00:00:05, 2020-10-10T00:00:10, 1, 3, 1]",
+"+I[b, 2020-10-10T00:00:05, 2020-10-10T00:00:10, 2, 9, 2]",
+"+I[b, 2020-10-10T00:00:15, 2020-10-10T00:00:20, 1, 4, 1]"
+};
+
+public static final String[] TUMBLE_EVENT_TIME_AFTER_ROWS = {
+"+I[b, 2020-10-10T00:00:30, 2020-10-10T00:00:35, 

Re: [PR] [FLINK-33676] Implement RestoreTests for WindowAggregate [flink]

2024-03-27 Thread via GitHub


jnh5y commented on code in PR #23886:
URL: https://github.com/apache/flink/pull/23886#discussion_r1541901325


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateTestPrograms.java:
##
@@ -0,0 +1,528 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.utils.AggregatePhaseStrategy;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+import java.math.BigDecimal;
+import java.util.function.Function;
+
+/** {@link TableTestProgram} definitions for testing {@link 
StreamExecWindowAggregate}. */
+public class WindowAggregateTestPrograms {
+
+static final Row[] BEFORE_DATA = {

Review Comment:
   Yes.  Doing it.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33676] Implement RestoreTests for WindowAggregate [flink]

2024-03-27 Thread via GitHub


jnh5y commented on code in PR #23886:
URL: https://github.com/apache/flink/pull/23886#discussion_r1541900291


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateTestPrograms.java:
##
@@ -0,0 +1,528 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.utils.AggregatePhaseStrategy;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+import java.math.BigDecimal;
+import java.util.function.Function;
+
+/** {@link TableTestProgram} definitions for testing {@link 
StreamExecWindowAggregate}. */
+public class WindowAggregateTestPrograms {
+
+static final Row[] BEFORE_DATA = {
+Row.of("2020-10-10 00:00:01", 1, 1d, 1f, new BigDecimal("1.11"), "Hi", 
"a"),
+Row.of("2020-10-10 00:00:02", 2, 2d, 2f, new BigDecimal("2.22"), 
"Comment#1", "a"),
+Row.of("2020-10-10 00:00:03", 2, 2d, 2f, new BigDecimal("2.22"), 
"Comment#1", "a"),
+Row.of("2020-10-10 00:00:04", 5, 5d, 5f, new BigDecimal("5.55"), null, 
"a"),
+Row.of("2020-10-10 00:00:07", 3, 3d, 3f, null, "Hello", "b"),
+// out of order
+Row.of("2020-10-10 00:00:06", 6, 6d, 6f, new BigDecimal("6.66"), "Hi", 
"b"),
+Row.of("2020-10-10 00:00:08", 3, null, 3f, new BigDecimal("3.33"), 
"Comment#2", "a"),
+// late event
+Row.of("2020-10-10 00:00:04", 5, 5d, null, new BigDecimal("5.55"), 
"Hi", "a"),
+Row.of("2020-10-10 00:00:16", 4, 4d, 4f, new BigDecimal("4.44"), "Hi", 
"b"),
+Row.of("2020-10-10 00:00:32", 7, 7d, 7f, new BigDecimal("7.77"), null, 
null),
+Row.of("2020-10-10 00:00:34", 1, 3d, 3f, new BigDecimal("3.33"), 
"Comment#3", "b")
+};
+
+static final Row[] AFTER_DATA = {
+Row.of("2020-10-10 00:00:40", 10, 3d, 3f, new BigDecimal("4.44"), 
"Comment#4", "a"),
+Row.of("2020-10-10 00:00:42", 11, 4d, 4f, new BigDecimal("5.44"), 
"Comment#5", "d"),
+Row.of("2020-10-10 00:00:43", 12, 5d, 5f, new BigDecimal("6.44"), 
"Comment#6", "c"),
+Row.of("2020-10-10 00:00:44", 13, 6d, 6f, new BigDecimal("7.44"), 
"Comment#7", "d")
+};
+
+static final Function SOURCE_BUILDER =
+str ->
+SourceTestStep.newBuilder(str)
+.addSchema(
+"ts STRING",
+"a_int INT",
+"b_double DOUBLE",
+"c_float FLOAT",
+"d_bigdec DECIMAL(10, 2)",
+"`comment` STRING",
+"name STRING",
+"`rowtime` AS TO_TIMESTAMP(`ts`)",
+"`proctime` AS PROCTIME()",
+"WATERMARK for `rowtime` AS `rowtime` - 
INTERVAL '1' SECOND")
+.addOption("changelog-mode", "I,UA,UB,D")
+.producedBeforeRestore(BEFORE_DATA)
+.producedAfterRestore(AFTER_DATA);
+static final SourceTestStep SOURCE = 
SOURCE_BUILDER.apply("window_source_t").build();
+
+static final SourceTestStep CDC_SOURCE =
+SOURCE_BUILDER
+.apply("cdc_window_source_t")
+.addOption("changelog-mode", "I,UA,UB,D")
+.build();
+
+static final String[] TUMBLE_EVENT_TIME_BEFORE_ROWS = {
+"+I[a, 2020-10-10T00:00, 2020-10-10T00:00:05, 4, 10, 2]",
+"+I[a, 2020-10-10T00:00:05, 2020-10-10T00:00:10, 1, 3, 1]",
+"+I[b, 2020-10-10T00:00:05, 2020-10-10T00:00:10, 2, 9, 2]",
+"+I[b, 2020-10-10T00:00:15, 2020-10-10T00:00:20, 1, 4, 1]"
+};
+
+public static final String[] TUMBLE_EVENT_TIME_AFTER_ROWS = {
+"+I[b, 2020-10-10T00:00:30, 2020-10-10T00:00:35, 

[jira] [Created] (FLINK-34954) Kryo input implementation NoFetchingInput fails to handle zero length bytes

2024-03-27 Thread Qinghui Xu (Jira)
Qinghui Xu created FLINK-34954:
--

 Summary: Kryo input implementation NoFetchingInput fails to handle 
zero length bytes
 Key: FLINK-34954
 URL: https://issues.apache.org/jira/browse/FLINK-34954
 Project: Flink
  Issue Type: Bug
Reporter: Qinghui Xu


If the serailized bytes are empty, `NoFetchingInput` will run into error when 
Kryo tries to deserialize it.

Example: a protobuf 3 object that contains only default values will be 
serialized as 0 length byte array, and the deserialization later will fail. 
Illustration:
```
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.\{ByteBufferInput, ByteBufferOutput, Input, 
Output}

import com.google.protobuf.\{DescriptorProtos, Message}import 
com.twitter.chill.protobuf.ProtobufSerializer
import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput

import java.io.ByteArrayInputStream

object ProtoSerializationTest {

  def main(args: Array[String]) = {
    val chillProtoSerializer = new ProtobufSerializer

    val protomessage = DescriptorProtos.DescriptorProto.getDefaultInstance    
val output: Output = new ByteBufferOutput(1000)
    chillProtoSerializer.write(null, output, protomessage)
    val serialized: Array[Byte] = output.toBytes
    println(s"Serialized : $serialized")
    val input: Input = new NoFetchingInput(new ByteArrayInputStream(serialized))
    val deserialized = chillProtoSerializer.read(null, input, 
classOf[BillableClick].asInstanceOf[Class[Message]])
    println(deserialized)
  }
}
```

Error
```

Exception in thread "main" java.lang.RuntimeException: Could not create class 
com.criteo.glup.BillableClickProto$BillableClick
    at 
com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76)
    at 
com.criteo.streaming.common.bootstrap.ProtoSerialization$.main(ProtoSerialization.scala:22)
    at ProtoSerialization.main(ProtoSerialization.scala)
Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No 
more bytes left.
    at 
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128)
    at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:332)
    at 
com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
    ... 2 more
Caused by: java.io.EOFException: No more bytes left.
    ... 5 more

```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33676] Implement RestoreTests for WindowAggregate [flink]

2024-03-27 Thread via GitHub


jnh5y commented on code in PR #23886:
URL: https://github.com/apache/flink/pull/23886#discussion_r1541896906


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateTestPrograms.java:
##
@@ -0,0 +1,528 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.utils.AggregatePhaseStrategy;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+import java.math.BigDecimal;
+import java.util.function.Function;
+
+/** {@link TableTestProgram} definitions for testing {@link 
StreamExecWindowAggregate}. */
+public class WindowAggregateTestPrograms {
+
+static final Row[] BEFORE_DATA = {
+Row.of("2020-10-10 00:00:01", 1, 1d, 1f, new BigDecimal("1.11"), "Hi", 
"a"),
+Row.of("2020-10-10 00:00:02", 2, 2d, 2f, new BigDecimal("2.22"), 
"Comment#1", "a"),
+Row.of("2020-10-10 00:00:03", 2, 2d, 2f, new BigDecimal("2.22"), 
"Comment#1", "a"),
+Row.of("2020-10-10 00:00:04", 5, 5d, 5f, new BigDecimal("5.55"), null, 
"a"),
+Row.of("2020-10-10 00:00:07", 3, 3d, 3f, null, "Hello", "b"),
+// out of order
+Row.of("2020-10-10 00:00:06", 6, 6d, 6f, new BigDecimal("6.66"), "Hi", 
"b"),
+Row.of("2020-10-10 00:00:08", 3, null, 3f, new BigDecimal("3.33"), 
"Comment#2", "a"),
+// late event
+Row.of("2020-10-10 00:00:04", 5, 5d, null, new BigDecimal("5.55"), 
"Hi", "a"),
+Row.of("2020-10-10 00:00:16", 4, 4d, 4f, new BigDecimal("4.44"), "Hi", 
"b"),
+Row.of("2020-10-10 00:00:32", 7, 7d, 7f, new BigDecimal("7.77"), null, 
null),
+Row.of("2020-10-10 00:00:34", 1, 3d, 3f, new BigDecimal("3.33"), 
"Comment#3", "b")
+};
+
+static final Row[] AFTER_DATA = {
+Row.of("2020-10-10 00:00:40", 10, 3d, 3f, new BigDecimal("4.44"), 
"Comment#4", "a"),
+Row.of("2020-10-10 00:00:42", 11, 4d, 4f, new BigDecimal("5.44"), 
"Comment#5", "d"),
+Row.of("2020-10-10 00:00:43", 12, 5d, 5f, new BigDecimal("6.44"), 
"Comment#6", "c"),
+Row.of("2020-10-10 00:00:44", 13, 6d, 6f, new BigDecimal("7.44"), 
"Comment#7", "d")
+};
+
+static final Function SOURCE_BUILDER =
+str ->
+SourceTestStep.newBuilder(str)
+.addSchema(
+"ts STRING",
+"a_int INT",
+"b_double DOUBLE",
+"c_float FLOAT",
+"d_bigdec DECIMAL(10, 2)",
+"`comment` STRING",
+"name STRING",
+"`rowtime` AS TO_TIMESTAMP(`ts`)",
+"`proctime` AS PROCTIME()",
+"WATERMARK for `rowtime` AS `rowtime` - 
INTERVAL '1' SECOND")
+.addOption("changelog-mode", "I,UA,UB,D")
+.producedBeforeRestore(BEFORE_DATA)
+.producedAfterRestore(AFTER_DATA);
+static final SourceTestStep SOURCE = 
SOURCE_BUILDER.apply("window_source_t").build();
+
+static final SourceTestStep CDC_SOURCE =
+SOURCE_BUILDER
+.apply("cdc_window_source_t")
+.addOption("changelog-mode", "I,UA,UB,D")
+.build();

Review Comment:
   Ah, I made a mistake; removing the changelog for the SOURCE_BUILDER.
   
   Just to say it out loud, the CDC_SOURCE is set up to help cover some of the 
tests which @xuyangzhong added like this: 
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest.java#L98
   
   Overall, there are several orthogonal options, and at some point, 

[jira] [Commented] (FLINK-34950) Disable spotless on Java 21 for connector-shared-utils

2024-03-27 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831486#comment-17831486
 ] 

Sergey Nuyanzin commented on FLINK-34950:
-

Merged as 
[d719c95235db17f5932d1bb5d917f7d6e195c371|https://github.com/apache/flink-connector-shared-utils/commit/d719c95235db17f5932d1bb5d917f7d6e195c371]

> Disable spotless on Java 21 for connector-shared-utils
> --
>
> Key: FLINK-34950
> URL: https://issues.apache.org/jira/browse/FLINK-34950
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Parent
>Affects Versions: connector-parent-1.1.0
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>
> after https://github.com/apache/flink-connector-shared-utils/pull/19
> spotless was stopped being skipped for java17+ in parent pom
> however we still need to skip it for java21+



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34950] Disable spotless for Java 21+ [flink-connector-shared-utils]

2024-03-27 Thread via GitHub


snuyanzin merged PR #39:
URL: https://github.com/apache/flink-connector-shared-utils/pull/39


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34324][ci] Replaces AWS-based S3 e2e tests with Minio-backed version [flink]

2024-03-27 Thread via GitHub


RyanSkraba commented on code in PR #24465:
URL: https://github.com/apache/flink/pull/24465#discussion_r1541478084


##
flink-end-to-end-tests/test-scripts/test_file_sink.sh:
##
@@ -79,30 +42,69 @@ 
TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.
 #   sorted content of part files
 ###
 function get_complete_result {
-  if [ "${OUT_TYPE}" == "s3" ]; then
-s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "$S3_PREFIX" 
"part-" true
-  fi
   find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g
 }
 
 ###
 # Get total number of lines in part files.
 #
 # Globals:
-#   S3_PREFIX
+#   OUTPUT_PATH
 # Arguments:
 #   None
 # Returns:
 #   line number in part files
 ###
 function get_total_number_of_valid_lines {
-  if [ "${OUT_TYPE}" == "local" ]; then
-get_complete_result | wc -l | tr -d '[:space:]'
-  elif [ "${OUT_TYPE}" == "s3" ]; then
-s3_get_number_of_lines_by_prefix "${S3_PREFIX}" "part-"
-  fi
+  get_complete_result | wc -l | tr -d '[:space:]'
 }
 
+if [ "${OUT_TYPE}" == "local" ]; then
+  echo "[INFO] Test run in local environment: No S3 environment is not loaded."
+elif [ "${OUT_TYPE}" == "s3" ]; then
+  source "$(dirname "$0")"/common_s3_minio.sh
+  s3_setup hadoop
+
+  # overwrites JOB_OUTPUT_PATH to point to S3
+  S3_DATA_PREFIX="${RANDOM_PREFIX}"
+  S3_CHECKPOINT_PREFIX="${RANDOM_PREFIX}-chk"
+  JOB_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/${S3_DATA_PREFIX}"
+  set_config_key "state.checkpoints.dir" 
"s3://$IT_CASE_S3_BUCKET/${S3_CHECKPOINT_PREFIX}"

Review Comment:
   In the original, there was a line `mkdir -p "$OUTPUT_PATH-chk"`.  Is this no 
longer necessary?



##
flink-end-to-end-tests/test-scripts/test_file_sink.sh:
##
@@ -79,30 +42,69 @@ 
TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.
 #   sorted content of part files
 ###
 function get_complete_result {
-  if [ "${OUT_TYPE}" == "s3" ]; then
-s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "$S3_PREFIX" 
"part-" true
-  fi
   find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g
 }
 
 ###
 # Get total number of lines in part files.
 #
 # Globals:
-#   S3_PREFIX
+#   OUTPUT_PATH
 # Arguments:
 #   None
 # Returns:
 #   line number in part files
 ###
 function get_total_number_of_valid_lines {
-  if [ "${OUT_TYPE}" == "local" ]; then
-get_complete_result | wc -l | tr -d '[:space:]'
-  elif [ "${OUT_TYPE}" == "s3" ]; then
-s3_get_number_of_lines_by_prefix "${S3_PREFIX}" "part-"
-  fi
+  get_complete_result | wc -l | tr -d '[:space:]'
 }
 
+if [ "${OUT_TYPE}" == "local" ]; then
+  echo "[INFO] Test run in local environment: No S3 environment is not loaded."
+elif [ "${OUT_TYPE}" == "s3" ]; then
+  source "$(dirname "$0")"/common_s3_minio.sh
+  s3_setup hadoop
+
+  # overwrites JOB_OUTPUT_PATH to point to S3
+  S3_DATA_PREFIX="${RANDOM_PREFIX}"
+  S3_CHECKPOINT_PREFIX="${RANDOM_PREFIX}-chk"
+  JOB_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/${S3_DATA_PREFIX}"
+  set_config_key "state.checkpoints.dir" 
"s3://$IT_CASE_S3_BUCKET/${S3_CHECKPOINT_PREFIX}"
+
+  # overwrites implementation for local runs
+  function get_complete_result {
+# copies the data from S3 to the local OUTPUT_PATH
+s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" 
"$FILE_SINK_TEST_TEMP_SUBFOLDER" "part-" true
+
+# and prints the sorted output
+find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort 
-g
+  }
+
+  # overwrites implementation for local runs
+  function get_total_number_of_valid_lines {
+s3_get_number_of_lines_by_prefix "${FILE_SINK_TEST_TEMP_SUBFOLDER}" "part-"
+  }
+
+  # make sure we delete the file at the end
+  function out_cleanup {
+s3_delete_by_full_path_prefix "${S3_DATA_PREFIX}"
+s3_delete_by_full_path_prefix "${S3_CHECKPOINT_PREFIX}"
+  }
+
+  on_exit out_cleanup
+else
+  echo "[ERROR] Unknown out type: ${OUT_TYPE}"
+  exit 1
+fi
+
+# randomly set up openSSL with dynamically/statically linked libraries

Review Comment:
   Did you mean to copy the `OPENSSL_LINKAGE` lines here?



##
flink-end-to-end-tests/test-scripts/test_file_sink.sh:
##
@@ -79,30 +42,69 @@ 
TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.
 #   sorted content of part files
 ###
 function get_complete_result {
-  if [ "${OUT_TYPE}" == "s3" ]; then
-s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "$S3_PREFIX" 
"part-" true
-  fi
   find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g
 }
 
 ###
 # Get total number of lines in part files.
 #
 # Globals:
-#   S3_PREFIX
+#   OUTPUT_PATH
 # Arguments:
 #   None
 # Returns:
 #   line number in 

[jira] [Updated] (FLINK-34538) Tune Flink config of autoscaled jobs

2024-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34538:
---
Labels: pull-request-available  (was: )

> Tune Flink config of autoscaled jobs
> 
>
> Key: FLINK-34538
> URL: https://issues.apache.org/jira/browse/FLINK-34538
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
>
> Umbrella issue to tackle tuning the Flink configuration as part of Flink 
> Autoscaling.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-34538][docs] Add Autotuning documentation [flink-kubernetes-operator]

2024-03-27 Thread via GitHub


mxm opened a new pull request, #807:
URL: https://github.com/apache/flink-kubernetes-operator/pull/807

   This adds documentation for Flink Autotuning.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-34551) Align retry mechanisms of FutureUtils

2024-03-27 Thread Kumar Mallikarjuna (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831453#comment-17831453
 ] 

Kumar Mallikarjuna edited comment on FLINK-34551 at 3/27/24 4:38 PM:
-

Hi [~mapohl] , I've opened a PR 
([#24578|https://github.com/apache/flink/pull/24578]) refactoring 
`retryOperation()` and `retryOperationWithDelay()`. I haven't refactored 
`retrySuccessfulWithDelay()`, since you had some changes on that already in 
your PR ([#24309|https://github.com/apache/flink/pull/24309]). Would you mind 
reviewing?

 


was (Author: JIRAUSER303984):
Hi [~mapohl] , I've opened a PR 
([#24578|https://github.com/apache/flink/pull/24578]) refactoring 
`retryOperation()` and `retryOperationWithDelay()`. I haven't refactored 
`retrySuccessfulWithDelay()`, since you had some changes on that already in 
your PR ([#24309|https://github.com/apache/flink/pull/24309]).

 

 

> Align retry mechanisms of FutureUtils
> -
>
> Key: FLINK-34551
> URL: https://issues.apache.org/jira/browse/FLINK-34551
> Project: Flink
>  Issue Type: Technical Debt
>  Components: API / Core
>Affects Versions: 1.20.0
>Reporter: Matthias Pohl
>Assignee: Kumar Mallikarjuna
>Priority: Major
>  Labels: pull-request-available
>
> The retry mechanisms of FutureUtils include quite a bit of redundant code 
> which makes it hard to understand and to extend. The logic should be aligned 
> properly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34551) Align retry mechanisms of FutureUtils

2024-03-27 Thread Kumar Mallikarjuna (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831453#comment-17831453
 ] 

Kumar Mallikarjuna commented on FLINK-34551:


Hi [~mapohl] , I've opened a PR 
([#24578|https://github.com/apache/flink/pull/24578]) refactoring 
`retryOperation()` and `retryOperationWithDelay()`. I haven't refactored 
`retrySuccessfulWithDelay()`, since you had some changes on that already in 
your PR ([#24309|https://github.com/apache/flink/pull/24309]).

 

 

> Align retry mechanisms of FutureUtils
> -
>
> Key: FLINK-34551
> URL: https://issues.apache.org/jira/browse/FLINK-34551
> Project: Flink
>  Issue Type: Technical Debt
>  Components: API / Core
>Affects Versions: 1.20.0
>Reporter: Matthias Pohl
>Assignee: Kumar Mallikarjuna
>Priority: Major
>  Labels: pull-request-available
>
> The retry mechanisms of FutureUtils include quite a bit of redundant code 
> which makes it hard to understand and to extend. The logic should be aligned 
> properly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34551][core] Refactor retry logic in `FutureUtils` [flink]

2024-03-27 Thread via GitHub


flinkbot commented on PR #24578:
URL: https://github.com/apache/flink/pull/24578#issuecomment-2023246471

   
   ## CI report:
   
   * 791954d16c88be92ae08faa1775f8bbeb583c727 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34551) Align retry mechanisms of FutureUtils

2024-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34551:
---
Labels: pull-request-available  (was: )

> Align retry mechanisms of FutureUtils
> -
>
> Key: FLINK-34551
> URL: https://issues.apache.org/jira/browse/FLINK-34551
> Project: Flink
>  Issue Type: Technical Debt
>  Components: API / Core
>Affects Versions: 1.20.0
>Reporter: Matthias Pohl
>Assignee: Kumar Mallikarjuna
>Priority: Major
>  Labels: pull-request-available
>
> The retry mechanisms of FutureUtils include quite a bit of redundant code 
> which makes it hard to understand and to extend. The logic should be aligned 
> properly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-34551][core] Refactor retry logic in `FutureUtils` [flink]

2024-03-27 Thread via GitHub


kumar-mallikarjuna opened a new pull request, #24578:
URL: https://github.com/apache/flink/pull/24578

   
   
   ## What is the purpose of the change
   
   FutureUtils has different retry methods and multiple implementations which 
are overlapping. This PR refactors the `retryOperation()` and 
`retryOperationWithDelay()` methods.
   
   
   ## Brief change log
   
   - Make the `retryOperation()` support scheduled exection
   - Remove `retryOperationWithDelay()`
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as `FutureUtilsTest`.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (don't know) - The 
fabric8 client uses `FutureUtils`
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] FLINK-31361 - Added the right shaded dependency for SQL client connector for kafka - Updated kafka.md [flink-connector-kafka]

2024-03-27 Thread via GitHub


diptimanr opened a new pull request, #88:
URL: https://github.com/apache/flink-connector-kafka/pull/88

   Modified the documentation on using the right dependency for 
'properties.sasl.jaas.config'. When using 'flink-sql-connector-kafka.jar', 
existing document doesn't use the shaded dependency.
   Also added the name of the jar file (flink-sql-connector-kafka-x.xx.x.jar).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31361) job created by sql-client can't authenticate to kafka, can't find org.apache.kafka.common.security.plain.PlainLoginModule

2024-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31361:
---
Labels: pull-request-available  (was: )

> job created by sql-client can't authenticate to kafka, can't find 
> org.apache.kafka.common.security.plain.PlainLoginModule
> -
>
> Key: FLINK-31361
> URL: https://issues.apache.org/jira/browse/FLINK-31361
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.16.1
>Reporter: David Anderson
>Priority: Major
>  Labels: pull-request-available
>
> I'm working with this SQL DDL:
> {noformat}
> CREATE TABLE pageviews_sink (
>   `url` STRING,
>   `user_id` STRING,
>   `browser` STRING,
>   `ts` TIMESTAMP_LTZ(3)
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'pageviews',
>   'properties.bootstrap.servers' = 'xxx.confluent.cloud:9092',
>   'properties.security.protocol'='SASL_SSL',
>   'properties.sasl.mechanism'='PLAIN',
>   
> 'properties.sasl.jaas.config'='org.apache.kafka.common.security.plain.PlainLoginModule
>  required username="xxx" password="xxx";',
>   'key.format' = 'json',
>   'key.fields' = 'url',
>   'value.format' = 'json'
> );
> {noformat}
> With {{flink-sql-connector-kafka-1.16.1.jar}} in the lib directory, this 
> fails with 
> {noformat}
> Caused by: javax.security.auth.login.LoginException: No LoginModule found for 
> org.apache.kafka.common.security.plain.PlainLoginModule{noformat}
> As a workaround I've found that it does work if I provide both
>  
> {{flink-connector-kafka-1.16.1.jar}}
> {{kafka-clients-3.2.3.jar}}
>  
> in the lib directory. It seems like the relocation applied in the SQL 
> connector isn't working properly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] FLINK-31361 - Added the right shaded dependency for SQL client connector for kafka - Updated kafka.md [flink-connector-kafka]

2024-03-27 Thread via GitHub


boring-cyborg[bot] commented on PR #88:
URL: 
https://github.com/apache/flink-connector-kafka/pull/88#issuecomment-2023159980

   Thanks for opening this pull request! Please check out our contributing 
guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-34937) Apache Infra GHA policy update

2024-03-27 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831422#comment-17831422
 ] 

Matthias Pohl edited comment on FLINK-34937 at 3/27/24 3:45 PM:


We should pin all actions (i.e. use the git SHA rather than a version tag) for 
external actions (anything other than {{actions/\*}}, {{github/\*}} and 
{{apache/\*}} prefixed actions). That's not the case right now.


was (Author: mapohl):
We should pin all actions (i.e. use the git SHA rather than a version tag) for 
external actions (anything other than {{actions/*}}, {{github/*}} and 
{{apache/*}} prefixed actions). That's not the case right now.

> Apache Infra GHA policy update
> --
>
> Key: FLINK-34937
> URL: https://issues.apache.org/jira/browse/FLINK-34937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System / CI
>Affects Versions: 1.19.0, 1.18.1, 1.20.0
>Reporter: Matthias Pohl
>Priority: Major
>
> There is a policy update [announced in the infra 
> ML|https://www.mail-archive.com/jdo-dev@db.apache.org/msg13638.html] which 
> asked Apache projects to limit the number of runners per job. Additionally, 
> the [GHA policy|https://infra.apache.org/github-actions-policy.html] is 
> referenced which I wasn't aware of when working on the action workflow.
> This issue is about applying the policy to the Flink GHA workflows.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34937) Apache Infra GHA policy update

2024-03-27 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831422#comment-17831422
 ] 

Matthias Pohl commented on FLINK-34937:
---

We should pin all actions (i.e. use the git SHA rather than a version tag) for 
external actions (anything other than {{actions/*}}, {{github/*}} and 
{{apache/*}} prefixed actions). That's not the case right now.

> Apache Infra GHA policy update
> --
>
> Key: FLINK-34937
> URL: https://issues.apache.org/jira/browse/FLINK-34937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System / CI
>Affects Versions: 1.19.0, 1.18.1, 1.20.0
>Reporter: Matthias Pohl
>Priority: Major
>
> There is a policy update [announced in the infra 
> ML|https://www.mail-archive.com/jdo-dev@db.apache.org/msg13638.html] which 
> asked Apache projects to limit the number of runners per job. Additionally, 
> the [GHA policy|https://infra.apache.org/github-actions-policy.html] is 
> referenced which I wasn't aware of when working on the action workflow.
> This issue is about applying the policy to the Flink GHA workflows.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33376][coordination] Extend ZooKeeper Curator configurations [flink]

2024-03-27 Thread via GitHub


XComp commented on PR #24563:
URL: https://github.com/apache/flink/pull/24563#issuecomment-2023008617

   no worries :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33376][coordination] Extend ZooKeeper Curator configurations [flink]

2024-03-27 Thread via GitHub


JTaky commented on PR #24563:
URL: https://github.com/apache/flink/pull/24563#issuecomment-2023006486

   Thanks @XComp for the review and sorry for more back and force than should 
be.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34947] Only scale down JM in Foreground deletion propagation and reduce timeout [flink-kubernetes-operator]

2024-03-27 Thread via GitHub


mxm commented on code in PR #806:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/806#discussion_r1541299679


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java:
##
@@ -306,34 +310,42 @@ protected Map 
getVertexResources(
 }
 
 /**
- * Scale JM deployment to zero to gracefully stop all JM instances before 
any TMs are stopped.
- * This avoids race conditions between JM shutdown and TM shutdown / 
failure handling.
+ * Shut down JobManagers gracefully by scaling JM deployment to zero. This 
avoids race
+ * conditions between JM shutdown and TM shutdown / failure handling.
  *
  * @param jmDeployment
  * @param namespace
  * @param clusterId
- * @param timeout
+ * @param remainingTimeout
  * @return Remaining timeout after the operation.
  */
-private Duration scaleJmToZeroBlocking(
+private Duration shutdownJobManagers(
 EditReplacePatchable jmDeployment,
 String namespace,
 String clusterId,
-Duration timeout) {
-return deleteBlocking(
-"Scaling JobManager Deployment to zero",
-() -> {
-try {
-
jmDeployment.patch(PatchContext.of(PatchType.JSON_MERGE), SCALE_TO_ZERO);
-} catch (Exception ignore) {
-// Ignore all errors here as this is an optional step
-return null;
-}
-return kubernetesClient
-.pods()
-.inNamespace(namespace)
-
.withLabels(KubernetesUtils.getJobManagerSelectors(clusterId));
-},
-timeout);
+Duration remainingTimeout) {
+
+// We use only half of the shutdown timeout but at most one minute as 
the main point
+// here is to initiate JM shutdown before the TMs
+var jmShutdownTimeout =
+ObjectUtils.min(JM_SHUTDOWN_MAX_WAIT, 
remainingTimeout.dividedBy(2));

Review Comment:
   I was missing that a Foreground delete call would clean up TaskManagers 
through delete propagation, even if the JM pod was stuck.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-34419) flink-docker's .github/workflows/snapshot.yml doesn't support JDK 17 and 21

2024-03-27 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl resolved FLINK-34419.
---
Resolution: Fixed

> flink-docker's .github/workflows/snapshot.yml doesn't support JDK 17 and 21
> ---
>
> Key: FLINK-34419
> URL: https://issues.apache.org/jira/browse/FLINK-34419
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Build System / CI
>Reporter: Matthias Pohl
>Assignee: Muhammet Orazov
>Priority: Major
>  Labels: pull-request-available, starter
>
> [.github/workflows/snapshot.yml|https://github.com/apache/flink-docker/blob/master/.github/workflows/snapshot.yml#L40]
>  needs to be updated: JDK 17 support was added in 1.18 (FLINK-15736). JDK 21 
> support was added in 1.19 (FLINK-33163)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34419) flink-docker's .github/workflows/snapshot.yml doesn't support JDK 17 and 21

2024-03-27 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831391#comment-17831391
 ] 

Matthias Pohl edited comment on FLINK-34419 at 3/27/24 2:56 PM:


master: 9e0041a2c9dace4bf3f32815e3e24e24385b179b
dev-master: 1460077743b29e17edd0a2d7efd3897fa097988d
dev-1.19: 67d7c46ed382a665e941f0cf1f1606d10f87dee5
dev-1.18: d93d911b015e535fc2b6f1426c3b36229ff3d02a


was (Author: mapohl):
master: 9e0041a2c9dace4bf3f32815e3e24e24385b179b
dev-master: tba
dev-1.19: tba
dev-1.18: tba

> flink-docker's .github/workflows/snapshot.yml doesn't support JDK 17 and 21
> ---
>
> Key: FLINK-34419
> URL: https://issues.apache.org/jira/browse/FLINK-34419
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Build System / CI
>Reporter: Matthias Pohl
>Assignee: Muhammet Orazov
>Priority: Major
>  Labels: pull-request-available, starter
>
> [.github/workflows/snapshot.yml|https://github.com/apache/flink-docker/blob/master/.github/workflows/snapshot.yml#L40]
>  needs to be updated: JDK 17 support was added in 1.18 (FLINK-15736). JDK 21 
> support was added in 1.19 (FLINK-33163)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34419][docker] Add tests for JDK 17 [flink-docker]

2024-03-27 Thread via GitHub


XComp merged PR #184:
URL: https://github.com/apache/flink-docker/pull/184


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34419][docker] Add tests for JDK 17 & 21 [flink-docker]

2024-03-27 Thread via GitHub


XComp merged PR #183:
URL: https://github.com/apache/flink-docker/pull/183


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34419][docker] Add tests for JDK 17 & 21 [flink-docker]

2024-03-27 Thread via GitHub


XComp merged PR #182:
URL: https://github.com/apache/flink-docker/pull/182


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34419) flink-docker's .github/workflows/snapshot.yml doesn't support JDK 17 and 21

2024-03-27 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831391#comment-17831391
 ] 

Matthias Pohl commented on FLINK-34419:
---

master: 9e0041a2c9dace4bf3f32815e3e24e24385b179b
dev-master: tba
dev-1.19: tba
dev-1.18: tba

> flink-docker's .github/workflows/snapshot.yml doesn't support JDK 17 and 21
> ---
>
> Key: FLINK-34419
> URL: https://issues.apache.org/jira/browse/FLINK-34419
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Build System / CI
>Reporter: Matthias Pohl
>Assignee: Muhammet Orazov
>Priority: Major
>  Labels: pull-request-available, starter
>
> [.github/workflows/snapshot.yml|https://github.com/apache/flink-docker/blob/master/.github/workflows/snapshot.yml#L40]
>  needs to be updated: JDK 17 support was added in 1.18 (FLINK-15736). JDK 21 
> support was added in 1.19 (FLINK-33163)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34419][docker] Added support of JDK 17 & 21 for Flink 1.18+ [flink-docker]

2024-03-27 Thread via GitHub


XComp merged PR #181:
URL: https://github.com/apache/flink-docker/pull/181


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34947] Only scale down JM in Foreground deletion propagation and reduce timeout [flink-kubernetes-operator]

2024-03-27 Thread via GitHub


mxm commented on code in PR #806:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/806#discussion_r1541273623


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java:
##
@@ -306,34 +310,42 @@ protected Map 
getVertexResources(
 }
 
 /**
- * Scale JM deployment to zero to gracefully stop all JM instances before 
any TMs are stopped.
- * This avoids race conditions between JM shutdown and TM shutdown / 
failure handling.
+ * Shut down JobManagers gracefully by scaling JM deployment to zero. This 
avoids race
+ * conditions between JM shutdown and TM shutdown / failure handling.
  *
  * @param jmDeployment
  * @param namespace
  * @param clusterId
- * @param timeout
+ * @param remainingTimeout
  * @return Remaining timeout after the operation.
  */
-private Duration scaleJmToZeroBlocking(
+private Duration shutdownJobManagers(
 EditReplacePatchable jmDeployment,
 String namespace,
 String clusterId,
-Duration timeout) {
-return deleteBlocking(
-"Scaling JobManager Deployment to zero",
-() -> {
-try {
-
jmDeployment.patch(PatchContext.of(PatchType.JSON_MERGE), SCALE_TO_ZERO);
-} catch (Exception ignore) {
-// Ignore all errors here as this is an optional step
-return null;
-}
-return kubernetesClient
-.pods()
-.inNamespace(namespace)
-
.withLabels(KubernetesUtils.getJobManagerSelectors(clusterId));
-},
-timeout);
+Duration remainingTimeout) {
+
+// We use only half of the shutdown timeout but at most one minute as 
the main point
+// here is to initiate JM shutdown before the TMs
+var jmShutdownTimeout =
+ObjectUtils.min(JM_SHUTDOWN_MAX_WAIT, 
remainingTimeout.dividedBy(2));

Review Comment:
   I'm proposing to keep the core deletion logic as in the main branch, 
together with the other changes in this PR (condition on Foreground deletion).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34947] Only scale down JM in Foreground deletion propagation and reduce timeout [flink-kubernetes-operator]

2024-03-27 Thread via GitHub


mxm commented on code in PR #806:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/806#discussion_r1541258264


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java:
##
@@ -306,34 +310,42 @@ protected Map 
getVertexResources(
 }
 
 /**
- * Scale JM deployment to zero to gracefully stop all JM instances before 
any TMs are stopped.
- * This avoids race conditions between JM shutdown and TM shutdown / 
failure handling.
+ * Shut down JobManagers gracefully by scaling JM deployment to zero. This 
avoids race
+ * conditions between JM shutdown and TM shutdown / failure handling.
  *
  * @param jmDeployment
  * @param namespace
  * @param clusterId
- * @param timeout
+ * @param remainingTimeout
  * @return Remaining timeout after the operation.
  */
-private Duration scaleJmToZeroBlocking(
+private Duration shutdownJobManagers(
 EditReplacePatchable jmDeployment,
 String namespace,
 String clusterId,
-Duration timeout) {
-return deleteBlocking(
-"Scaling JobManager Deployment to zero",
-() -> {
-try {
-
jmDeployment.patch(PatchContext.of(PatchType.JSON_MERGE), SCALE_TO_ZERO);
-} catch (Exception ignore) {
-// Ignore all errors here as this is an optional step
-return null;
-}
-return kubernetesClient
-.pods()
-.inNamespace(namespace)
-
.withLabels(KubernetesUtils.getJobManagerSelectors(clusterId));
-},
-timeout);
+Duration remainingTimeout) {
+
+// We use only half of the shutdown timeout but at most one minute as 
the main point
+// here is to initiate JM shutdown before the TMs
+var jmShutdownTimeout =
+ObjectUtils.min(JM_SHUTDOWN_MAX_WAIT, 
remainingTimeout.dividedBy(2));

Review Comment:
   I think I understand what the new delete behavior (in the main branch) is 
trying to solve. I'm just saying that there is no need to time out the scale 
down request after half the timeout (or after one minute). The current behavior 
(in the main branch) is sufficient and avoids unnecessary complexity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34947] Only scale down JM in Foreground deletion propagation and reduce timeout [flink-kubernetes-operator]

2024-03-27 Thread via GitHub


mxm commented on code in PR #806:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/806#discussion_r1541258264


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java:
##
@@ -306,34 +310,42 @@ protected Map 
getVertexResources(
 }
 
 /**
- * Scale JM deployment to zero to gracefully stop all JM instances before 
any TMs are stopped.
- * This avoids race conditions between JM shutdown and TM shutdown / 
failure handling.
+ * Shut down JobManagers gracefully by scaling JM deployment to zero. This 
avoids race
+ * conditions between JM shutdown and TM shutdown / failure handling.
  *
  * @param jmDeployment
  * @param namespace
  * @param clusterId
- * @param timeout
+ * @param remainingTimeout
  * @return Remaining timeout after the operation.
  */
-private Duration scaleJmToZeroBlocking(
+private Duration shutdownJobManagers(
 EditReplacePatchable jmDeployment,
 String namespace,
 String clusterId,
-Duration timeout) {
-return deleteBlocking(
-"Scaling JobManager Deployment to zero",
-() -> {
-try {
-
jmDeployment.patch(PatchContext.of(PatchType.JSON_MERGE), SCALE_TO_ZERO);
-} catch (Exception ignore) {
-// Ignore all errors here as this is an optional step
-return null;
-}
-return kubernetesClient
-.pods()
-.inNamespace(namespace)
-
.withLabels(KubernetesUtils.getJobManagerSelectors(clusterId));
-},
-timeout);
+Duration remainingTimeout) {
+
+// We use only half of the shutdown timeout but at most one minute as 
the main point
+// here is to initiate JM shutdown before the TMs
+var jmShutdownTimeout =
+ObjectUtils.min(JM_SHUTDOWN_MAX_WAIT, 
remainingTimeout.dividedBy(2));

Review Comment:
   I think I understand what the new behavior (in the main branch) is trying to 
solve. I'm just saying that there is no need to time out the scale down request 
after half the timeout (or after one minute). The old behavior (in the main 
branch) is sufficient and avoids unnecessary complexity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34419][docker] Add tests for JDK 17 & 21 [flink-docker]

2024-03-27 Thread via GitHub


morazow commented on PR #182:
URL: https://github.com/apache/flink-docker/pull/182#issuecomment-2022958489

   Thanks @XComp, I have updated both pull requests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34947] Only scale down JM in Foreground deletion propagation and reduce timeout [flink-kubernetes-operator]

2024-03-27 Thread via GitHub


mxm commented on code in PR #806:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/806#discussion_r1541258264


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java:
##
@@ -306,34 +310,42 @@ protected Map 
getVertexResources(
 }
 
 /**
- * Scale JM deployment to zero to gracefully stop all JM instances before 
any TMs are stopped.
- * This avoids race conditions between JM shutdown and TM shutdown / 
failure handling.
+ * Shut down JobManagers gracefully by scaling JM deployment to zero. This 
avoids race
+ * conditions between JM shutdown and TM shutdown / failure handling.
  *
  * @param jmDeployment
  * @param namespace
  * @param clusterId
- * @param timeout
+ * @param remainingTimeout
  * @return Remaining timeout after the operation.
  */
-private Duration scaleJmToZeroBlocking(
+private Duration shutdownJobManagers(
 EditReplacePatchable jmDeployment,
 String namespace,
 String clusterId,
-Duration timeout) {
-return deleteBlocking(
-"Scaling JobManager Deployment to zero",
-() -> {
-try {
-
jmDeployment.patch(PatchContext.of(PatchType.JSON_MERGE), SCALE_TO_ZERO);
-} catch (Exception ignore) {
-// Ignore all errors here as this is an optional step
-return null;
-}
-return kubernetesClient
-.pods()
-.inNamespace(namespace)
-
.withLabels(KubernetesUtils.getJobManagerSelectors(clusterId));
-},
-timeout);
+Duration remainingTimeout) {
+
+// We use only half of the shutdown timeout but at most one minute as 
the main point
+// here is to initiate JM shutdown before the TMs
+var jmShutdownTimeout =
+ObjectUtils.min(JM_SHUTDOWN_MAX_WAIT, 
remainingTimeout.dividedBy(2));

Review Comment:
   I think I understand what the new behavior is trying to solve. I'm just 
saying that there is no need to time out the scale down request after half the 
timeout (or after one minute). The old behavior (in the main branch) is 
sufficient and avoids unnecessary complexity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-1.18][FLINK-34897][test] Enables JobMasterServiceLeadershipRunnerTest#testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip again. [flink]

2024-03-27 Thread via GitHub


XComp merged PR #24577:
URL: https://github.com/apache/flink/pull/24577


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-1.19][FLINK-34897][test] Enables JobMasterServiceLeadershipRunnerTest#testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip again. [flink]

2024-03-27 Thread via GitHub


XComp merged PR #24576:
URL: https://github.com/apache/flink/pull/24576


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34897][test] Enables JobMasterServiceLeadershipRunnerTest#testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip again. [flink]

2024-03-27 Thread via GitHub


XComp merged PR #24546:
URL: https://github.com/apache/flink/pull/24546


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-34897) JobMasterServiceLeadershipRunnerTest#testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip needs to be enabled again

2024-03-27 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl resolved FLINK-34897.
---
Fix Version/s: 1.18.2
   1.20.0
   1.19.1
   Resolution: Fixed

master: 
[0e70d89ad9f807a5816290e9808720e71bdad655|https://github.com/apache/flink/commit/0e70d89ad9f807a5816290e9808720e71bdad655]
1.19: 
[6b5c48ff53ddc6e75056a9050afded2ac44a413a|https://github.com/apache/flink/commit/6b5c48ff53ddc6e75056a9050afded2ac44a413a]
1.18: 
[a6aa569f5005041934a2e6398b6749584beeaabd|https://github.com/apache/flink/commit/a6aa569f5005041934a2e6398b6749584beeaabd]

> JobMasterServiceLeadershipRunnerTest#testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip
>  needs to be enabled again
> --
>
> Key: FLINK-34897
> URL: https://issues.apache.org/jira/browse/FLINK-34897
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Coordination
>Affects Versions: 1.19.0, 1.18.1, 1.20.0
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.2, 1.20.0, 1.19.1
>
>
> While working on FLINK-34672 I noticed that 
> {{JobMasterServiceLeadershipRunnerTest#testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip}}
>  is disabled without a reason.
> It looks like I disabled it accidentally as part of FLINK-31783.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-03-27 Thread via GitHub


XComp commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1541160078


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java:
##
@@ -34,66 +35,105 @@
 import org.apache.flink.table.sinks.CsvTableSink;
 import org.apache.flink.table.sources.InputFormatTableSource;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.test.resources.ResourceTestUtils;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneOffset;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.UUID;
 
-/**
- * End-to-end test for batch SQL queries.
- *
- * The sources are generated and bounded. The result is always constant.
- *
- * Parameters: -outputPath output file path for CsvTableSink; -sqlStatement 
SQL statement that
- * will be executed as executeSql
- */
-public class BatchSQLTestProgram {
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** End-to-End tests for Batch SQL tests. */
+@ExtendWith(TestLoggerExtension.class)
+public class BatchSQLTest {

Review Comment:
   ```suggestion
   class BatchSQLTest {
   ```
   Junit5 allows for test classes to be package-protected. This will enable you 
to remove the JavaDoc and still comply to checkstyle. The JavaDoc itself 
doesn't add much value.



##
flink-end-to-end-tests/flink-batch-sql-test/src/test/resources/log4j2-test.properties:
##
@@ -0,0 +1,31 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level=OFF
+rootLogger.appenderRef.test.ref=TestLogger
+appender.testlogger.name=TestLogger
+appender.testlogger.type=CONSOLE
+appender.testlogger.target=SYSTEM_ERR
+appender.testlogger.layout.type=PatternLayout
+appender.testlogger.layout.pattern=%-4r [%t] %-5p %c %x - %m%n
+# Uncomment to enable codegen logging
+#loggers = testlogger
+#logger.testlogger.name =org.apache.flink.table.planner.codegen
+#logger.testlogger.level = TRACE
+#logger.testlogger.appenderRefs = TestLogger

Review Comment:
   ```suggestion
   
   # Set root logger level to OFF to not flood build logs
   # set manually to INFO for debugging purposes
   rootLogger.level=OFF
   rootLogger.appenderRef.test.ref=TestLogger
   
   appender.testlogger.name=TestLogger
   appender.testlogger.type=CONSOLE
   appender.testlogger.target=SYSTEM_ERR
   appender.testlogger.layout.type=PatternLayout
   appender.testlogger.layout.pattern=%-4r [%t] %-5p %c %x - %m%n
   
   # Uncomment to enable codegen logging
   #loggers = testlogger
   #logger.testlogger.name =org.apache.flink.table.planner.codegen
   #logger.testlogger.level = TRACE
   #logger.testlogger.appenderRefs = TestLogger
   ```
   Then let's add some empty lines in between



##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java:
##
@@ -34,66 +35,105 @@
 import org.apache.flink.table.sinks.CsvTableSink;
 import org.apache.flink.table.sources.InputFormatTableSource;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.test.resources.ResourceTestUtils;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;

[jira] [Commented] (FLINK-34953) Add github ci for flink-web to auto commit build files

2024-03-27 Thread Zhongqiang Gong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831367#comment-17831367
 ] 

Zhongqiang Gong commented on FLINK-34953:
-

[~martijnvisser] Thanks for your patience in explaining.

> Add github ci for flink-web to auto commit build files
> --
>
> Key: FLINK-34953
> URL: https://issues.apache.org/jira/browse/FLINK-34953
> Project: Flink
>  Issue Type: Improvement
>  Components: Project Website
>Reporter: Zhongqiang Gong
>Priority: Minor
>  Labels: website
>
> Currently, https://github.com/apache/flink-web commit build files by local 
> build. So I want use github ci to build docs and commit.
>  
> Changes:
>  * Add website build check for pr
>  * Auto build and commit build files after pr was merged to `asf-site`
>  * Optinal: this ci can triggered by manual



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34953) Add github ci for flink-web to auto commit build files

2024-03-27 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831359#comment-17831359
 ] 

Martijn Visser commented on FLINK-34953:


[~gongzhongqiang]  ASF policies don't allow anyone else besides committers (not 
even CI) to commit files. So this can't happen

> Add github ci for flink-web to auto commit build files
> --
>
> Key: FLINK-34953
> URL: https://issues.apache.org/jira/browse/FLINK-34953
> Project: Flink
>  Issue Type: Improvement
>  Components: Project Website
>Reporter: Zhongqiang Gong
>Priority: Minor
>  Labels: website
>
> Currently, https://github.com/apache/flink-web commit build files by local 
> build. So I want use github ci to build docs and commit.
>  
> Changes:
>  * Add website build check for pr
>  * Auto build and commit build files after pr was merged to `asf-site`
>  * Optinal: this ci can triggered by manual



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34953) Add github ci for flink-web to auto commit build files

2024-03-27 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser closed FLINK-34953.
--
Resolution: Invalid

> Add github ci for flink-web to auto commit build files
> --
>
> Key: FLINK-34953
> URL: https://issues.apache.org/jira/browse/FLINK-34953
> Project: Flink
>  Issue Type: Improvement
>  Components: Project Website
>Reporter: Zhongqiang Gong
>Priority: Minor
>  Labels: website
>
> Currently, https://github.com/apache/flink-web commit build files by local 
> build. So I want use github ci to build docs and commit.
>  
> Changes:
>  * Add website build check for pr
>  * Auto build and commit build files after pr was merged to `asf-site`
>  * Optinal: this ci can triggered by manual



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-03-27 Thread via GitHub


MartijnVisser commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2022804099

   > What is your opinion on how the function should behave?
   
   I've taken a look at how INTERSECT is defined in the SQL standard. Based on 
https://stackoverflow.com/questions/59060599/does-intersect-operator-exist-in-the-sql-standar,
  https://www.postgresql.org/docs/current/queries-union.html, the fact that 
Calcite differentiates between INTERSECT and INTERSECT ALL leads me to believe 
that the default behavior of INTERSECT is to remove duplicates. 
   
   So the result of INTERSECT on `[1, 1, 1, 2] INTERSECT [1, 1, 2]` should be 
`[1, 2]` in my understanding. I think that Spark/Databricks/Presto are 
performing the correct behavior. 
   
   BigQuery and Redshift don't support ARRAY_INTERSECT. ksqlDB follows the same 
behavior as Spark/Databricks/Presto per 
https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/scalar-functions/#array_intersect.
 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33376][coordination] Extend ZooKeeper Curator configurations [flink]

2024-03-27 Thread via GitHub


XComp commented on code in PR #24563:
URL: https://github.com/apache/flink/pull/24563#discussion_r1541117995


##
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java:
##
@@ -246,6 +249,38 @@ public static CuratorFrameworkWithUnhandledErrorListener 
startCuratorFramework(
 .ensembleTracker(ensembleTracking)
 .aclProvider(aclProvider);
 
+if 
(configuration.contains(HighAvailabilityOptions.ZOOKEEPER_CLIENT_AUTHORIZATION))
 {
+Map authMap =
+
configuration.get(HighAvailabilityOptions.ZOOKEEPER_CLIENT_AUTHORIZATION);
+List authInfos =
+authMap.entrySet().stream()
+.map(
+entry ->
+new AuthInfo(
+entry.getKey(),
+entry.getValue()
+.getBytes(
+
ConfigConstants
+
.DEFAULT_CHARSET)))
+.collect(Collectors.toList());
+curatorFrameworkBuilder.authorization(authInfos);
+}
+
+if 
(configuration.contains(HighAvailabilityOptions.ZOOKEEPER_MAX_CLOSE_WAIT_MS)) {
+curatorFrameworkBuilder.maxCloseWaitMs(
+(int)
+configuration
+
.get(HighAvailabilityOptions.ZOOKEEPER_MAX_CLOSE_WAIT_MS)
+.toMillis());

Review Comment:
   You have a point there. It appears to be a quite nitty. But on the other 
hand, it's also quite unlikely that a user would set such a timeout. Therefore, 
an exception wouldn't hurt. :shrug: 



##
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java:
##
@@ -246,6 +249,38 @@ public static CuratorFrameworkWithUnhandledErrorListener 
startCuratorFramework(
 .ensembleTracker(ensembleTracking)
 .aclProvider(aclProvider);
 
+if 
(configuration.contains(HighAvailabilityOptions.ZOOKEEPER_CLIENT_AUTHORIZATION))
 {
+Map authMap =
+
configuration.get(HighAvailabilityOptions.ZOOKEEPER_CLIENT_AUTHORIZATION);
+List authInfos =
+authMap.entrySet().stream()
+.map(
+entry ->
+new AuthInfo(
+entry.getKey(),
+entry.getValue()
+.getBytes(
+
ConfigConstants
+
.DEFAULT_CHARSET)))
+.collect(Collectors.toList());
+curatorFrameworkBuilder.authorization(authInfos);
+}
+
+if 
(configuration.contains(HighAvailabilityOptions.ZOOKEEPER_MAX_CLOSE_WAIT)) {
+long maxCloseWait = 
configuration.get(HighAvailabilityOptions.ZOOKEEPER_MAX_CLOSE_WAIT).toMillis();
+if (maxCloseWait < Integer.MIN_VALUE || maxCloseWait > 
Integer.MAX_VALUE) {
+throw new 
IllegalConfigurationException(HighAvailabilityOptions.ZOOKEEPER_MAX_CLOSE_WAIT.key()
 + " in ms is not an integer - " + maxCloseWait);

Review Comment:
   ```suggestion
   if (maxCloseWait < 0 || maxCloseWait > Integer.MAX_VALUE) {
   throw new IllegalConfigurationException(
   "The value (%d ms) is out-of-range for %s. The 
milliseconds timeout is expected to be between 0 and %d ms.",
   maxCloseWait,
   
HighAvailabilityOptions.ZOOKEEPER_MAX_CLOSE_WAIT.key(),
   Integer.MAX_VALUE);
   }
   ```
   nit: since tthere is a constructor for formatted strings.



##
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java:
##
@@ -246,6 +249,38 @@ public static CuratorFrameworkWithUnhandledErrorListener 
startCuratorFramework(
 .ensembleTracker(ensembleTracking)
 .aclProvider(aclProvider);
 
+if 
(configuration.contains(HighAvailabilityOptions.ZOOKEEPER_CLIENT_AUTHORIZATION))
 {
+Map authMap =
+
configuration.get(HighAvailabilityOptions.ZOOKEEPER_CLIENT_AUTHORIZATION);
+List authInfos =
+authMap.entrySet().stream()
+.map(
+entry ->
+  

Re: [PR] [FLINK-33970][jdbc][docs] Remove dead link [flink-connector-jdbc]

2024-03-27 Thread via GitHub


GOODBOY008 commented on PR #88:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/88#issuecomment-2022772500

   @MartijnVisser PTAL , Thank you~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34934] Translation Flink-Kubernetes-Operator document framework construction [flink-kubernetes-operator]

2024-03-27 Thread via GitHub


caicancai commented on PR #805:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/805#issuecomment-2022758286

   @1996fanrui  done
   ![2024-03-27 
21-24-15屏幕截图](https://github.com/apache/flink-kubernetes-operator/assets/77189278/c2973efd-a1af-4fc4-afbb-08f53c7fdbeb)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34947] Only scale down JM in Foreground deletion propagation and reduce timeout [flink-kubernetes-operator]

2024-03-27 Thread via GitHub


gyfora commented on code in PR #806:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/806#discussion_r1541081443


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java:
##
@@ -306,34 +310,42 @@ protected Map 
getVertexResources(
 }
 
 /**
- * Scale JM deployment to zero to gracefully stop all JM instances before 
any TMs are stopped.
- * This avoids race conditions between JM shutdown and TM shutdown / 
failure handling.
+ * Shut down JobManagers gracefully by scaling JM deployment to zero. This 
avoids race
+ * conditions between JM shutdown and TM shutdown / failure handling.
  *
  * @param jmDeployment
  * @param namespace
  * @param clusterId
- * @param timeout
+ * @param remainingTimeout
  * @return Remaining timeout after the operation.
  */
-private Duration scaleJmToZeroBlocking(
+private Duration shutdownJobManagers(
 EditReplacePatchable jmDeployment,
 String namespace,
 String clusterId,
-Duration timeout) {
-return deleteBlocking(
-"Scaling JobManager Deployment to zero",
-() -> {
-try {
-
jmDeployment.patch(PatchContext.of(PatchType.JSON_MERGE), SCALE_TO_ZERO);
-} catch (Exception ignore) {
-// Ignore all errors here as this is an optional step
-return null;
-}
-return kubernetesClient
-.pods()
-.inNamespace(namespace)
-
.withLabels(KubernetesUtils.getJobManagerSelectors(clusterId));
-},
-timeout);
+Duration remainingTimeout) {
+
+// We use only half of the shutdown timeout but at most one minute as 
the main point
+// here is to initiate JM shutdown before the TMs
+var jmShutdownTimeout =
+ObjectUtils.min(JM_SHUTDOWN_MAX_WAIT, 
remainingTimeout.dividedBy(2));

Review Comment:
   The reason why clean blocking deletion is always preferred is due to issues 
like https://issues.apache.org/jira/browse/FLINK-32334 .
   It is always better to wait for the deployments/pods to go away before doing 
anything otherwise strange things can happen, like JMs/TMs reconnecting to new 
clusters etc
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34947] Only scale down JM in Foreground deletion propagation and reduce timeout [flink-kubernetes-operator]

2024-03-27 Thread via GitHub


gyfora commented on code in PR #806:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/806#discussion_r1541074102


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java:
##
@@ -306,34 +310,42 @@ protected Map 
getVertexResources(
 }
 
 /**
- * Scale JM deployment to zero to gracefully stop all JM instances before 
any TMs are stopped.
- * This avoids race conditions between JM shutdown and TM shutdown / 
failure handling.
+ * Shut down JobManagers gracefully by scaling JM deployment to zero. This 
avoids race
+ * conditions between JM shutdown and TM shutdown / failure handling.
  *
  * @param jmDeployment
  * @param namespace
  * @param clusterId
- * @param timeout
+ * @param remainingTimeout
  * @return Remaining timeout after the operation.
  */
-private Duration scaleJmToZeroBlocking(
+private Duration shutdownJobManagers(
 EditReplacePatchable jmDeployment,
 String namespace,
 String clusterId,
-Duration timeout) {
-return deleteBlocking(
-"Scaling JobManager Deployment to zero",
-() -> {
-try {
-
jmDeployment.patch(PatchContext.of(PatchType.JSON_MERGE), SCALE_TO_ZERO);
-} catch (Exception ignore) {
-// Ignore all errors here as this is an optional step
-return null;
-}
-return kubernetesClient
-.pods()
-.inNamespace(namespace)
-
.withLabels(KubernetesUtils.getJobManagerSelectors(clusterId));
-},
-timeout);
+Duration remainingTimeout) {
+
+// We use only half of the shutdown timeout but at most one minute as 
the main point
+// here is to initiate JM shutdown before the TMs
+var jmShutdownTimeout =
+ObjectUtils.min(JM_SHUTDOWN_MAX_WAIT, 
remainingTimeout.dividedBy(2));

Review Comment:
   This is not really user facing logic in the sense that the only thing that 
user cares about is a clean shutdown within the configured total shutdown 
timeout.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33376][coordination] Extend ZooKeeper Curator configurations [flink]

2024-03-27 Thread via GitHub


JTaky commented on code in PR #24563:
URL: https://github.com/apache/flink/pull/24563#discussion_r1541075870


##
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java:
##
@@ -246,6 +249,38 @@ public static CuratorFrameworkWithUnhandledErrorListener 
startCuratorFramework(
 .ensembleTracker(ensembleTracking)
 .aclProvider(aclProvider);
 
+if 
(configuration.contains(HighAvailabilityOptions.ZOOKEEPER_CLIENT_AUTHORIZATION))
 {
+Map authMap =
+
configuration.get(HighAvailabilityOptions.ZOOKEEPER_CLIENT_AUTHORIZATION);
+List authInfos =
+authMap.entrySet().stream()
+.map(
+entry ->
+new AuthInfo(
+entry.getKey(),
+entry.getValue()
+.getBytes(
+
ConfigConstants
+
.DEFAULT_CHARSET)))
+.collect(Collectors.toList());
+curatorFrameworkBuilder.authorization(authInfos);
+}
+
+if 
(configuration.contains(HighAvailabilityOptions.ZOOKEEPER_MAX_CLOSE_WAIT_MS)) {
+curatorFrameworkBuilder.maxCloseWaitMs(
+(int)
+configuration
+
.get(HighAvailabilityOptions.ZOOKEEPER_MAX_CLOSE_WAIT_MS)
+.toMillis());

Review Comment:
   I am hesitant to throw an Exception here, but probably it can help somebody 
who will put wrong duration value, e.g. days? (MaxInteger in ms is more than 
500 hours)
   ```
   if (maxCloseWait < Integer.MIN_VALUE || maxCloseWait > Integer.MAX_VALUE) {
   throw new 
IllegalConfigurationException(HighAvailabilityOptions.ZOOKEEPER_MAX_CLOSE_WAIT.key()
 + " in ms is not an integer - " + maxCloseWait);
   }```
   
   As an alternative we could imagine to put a WARN and silently limit the 
value to the MaxInteger. While it is always a bad practice to make something 
implicit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34947] Only scale down JM in Foreground deletion propagation and reduce timeout [flink-kubernetes-operator]

2024-03-27 Thread via GitHub


gyfora commented on code in PR #806:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/806#discussion_r1541072245


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java:
##
@@ -306,34 +310,42 @@ protected Map 
getVertexResources(
 }
 
 /**
- * Scale JM deployment to zero to gracefully stop all JM instances before 
any TMs are stopped.
- * This avoids race conditions between JM shutdown and TM shutdown / 
failure handling.
+ * Shut down JobManagers gracefully by scaling JM deployment to zero. This 
avoids race
+ * conditions between JM shutdown and TM shutdown / failure handling.
  *
  * @param jmDeployment
  * @param namespace
  * @param clusterId
- * @param timeout
+ * @param remainingTimeout
  * @return Remaining timeout after the operation.
  */
-private Duration scaleJmToZeroBlocking(
+private Duration shutdownJobManagers(
 EditReplacePatchable jmDeployment,
 String namespace,
 String clusterId,
-Duration timeout) {
-return deleteBlocking(
-"Scaling JobManager Deployment to zero",
-() -> {
-try {
-
jmDeployment.patch(PatchContext.of(PatchType.JSON_MERGE), SCALE_TO_ZERO);
-} catch (Exception ignore) {
-// Ignore all errors here as this is an optional step
-return null;
-}
-return kubernetesClient
-.pods()
-.inNamespace(namespace)
-
.withLabels(KubernetesUtils.getJobManagerSelectors(clusterId));
-},
-timeout);
+Duration remainingTimeout) {
+
+// We use only half of the shutdown timeout but at most one minute as 
the main point
+// here is to initiate JM shutdown before the TMs
+var jmShutdownTimeout =
+ObjectUtils.min(JM_SHUTDOWN_MAX_WAIT, 
remainingTimeout.dividedBy(2));

Review Comment:
   I don't fully understand what you mean by non-blocking delete. What we gain 
here by shutting down the JM first is cleaner logs and generally faster 
shutdown for the TM pods which overall leads to more stable shutdown for the 
entire job.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34947] Only scale down JM in Foreground deletion propagation and reduce timeout [flink-kubernetes-operator]

2024-03-27 Thread via GitHub


mxm commented on code in PR #806:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/806#discussion_r1541062926


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java:
##
@@ -306,34 +310,42 @@ protected Map 
getVertexResources(
 }
 
 /**
- * Scale JM deployment to zero to gracefully stop all JM instances before 
any TMs are stopped.
- * This avoids race conditions between JM shutdown and TM shutdown / 
failure handling.
+ * Shut down JobManagers gracefully by scaling JM deployment to zero. This 
avoids race
+ * conditions between JM shutdown and TM shutdown / failure handling.
  *
  * @param jmDeployment
  * @param namespace
  * @param clusterId
- * @param timeout
+ * @param remainingTimeout
  * @return Remaining timeout after the operation.
  */
-private Duration scaleJmToZeroBlocking(
+private Duration shutdownJobManagers(
 EditReplacePatchable jmDeployment,
 String namespace,
 String clusterId,
-Duration timeout) {
-return deleteBlocking(
-"Scaling JobManager Deployment to zero",
-() -> {
-try {
-
jmDeployment.patch(PatchContext.of(PatchType.JSON_MERGE), SCALE_TO_ZERO);
-} catch (Exception ignore) {
-// Ignore all errors here as this is an optional step
-return null;
-}
-return kubernetesClient
-.pods()
-.inNamespace(namespace)
-
.withLabels(KubernetesUtils.getJobManagerSelectors(clusterId));
-},
-timeout);
+Duration remainingTimeout) {
+
+// We use only half of the shutdown timeout but at most one minute as 
the main point
+// here is to initiate JM shutdown before the TMs
+var jmShutdownTimeout =
+ObjectUtils.min(JM_SHUTDOWN_MAX_WAIT, 
remainingTimeout.dividedBy(2));

Review Comment:
   This is complicating the logic quite a bit and may be non-obvious to users. 
What do we really gain from deleting the JM deployment earlier? The TaskManager 
may be recycled quicker, but the JM shutdown speed isn't affected.



##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java:
##
@@ -306,34 +310,42 @@ protected Map 
getVertexResources(
 }
 
 /**
- * Scale JM deployment to zero to gracefully stop all JM instances before 
any TMs are stopped.
- * This avoids race conditions between JM shutdown and TM shutdown / 
failure handling.
+ * Shut down JobManagers gracefully by scaling JM deployment to zero. This 
avoids race
+ * conditions between JM shutdown and TM shutdown / failure handling.
  *
  * @param jmDeployment
  * @param namespace
  * @param clusterId
- * @param timeout
+ * @param remainingTimeout
  * @return Remaining timeout after the operation.
  */
-private Duration scaleJmToZeroBlocking(
+private Duration shutdownJobManagers(

Review Comment:
   ```suggestion
   private Duration shutdownJobManagersBlocking(
   ```
   
   Just to keep the general pattern, I think it's helpful to the reader.



##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java:
##
@@ -306,34 +310,42 @@ protected Map 
getVertexResources(
 }
 
 /**
- * Scale JM deployment to zero to gracefully stop all JM instances before 
any TMs are stopped.
- * This avoids race conditions between JM shutdown and TM shutdown / 
failure handling.
+ * Shut down JobManagers gracefully by scaling JM deployment to zero. This 
avoids race
+ * conditions between JM shutdown and TM shutdown / failure handling.
  *
  * @param jmDeployment
  * @param namespace
  * @param clusterId
- * @param timeout
+ * @param remainingTimeout
  * @return Remaining timeout after the operation.
  */
-private Duration scaleJmToZeroBlocking(
+private Duration shutdownJobManagers(
 EditReplacePatchable jmDeployment,
 String namespace,
 String clusterId,
-Duration timeout) {
-return deleteBlocking(
-"Scaling JobManager Deployment to zero",
-() -> {
-try {
-
jmDeployment.patch(PatchContext.of(PatchType.JSON_MERGE), SCALE_TO_ZERO);
-} catch (Exception ignore) {
-// Ignore all errors here as this is an optional step
-return null;
-}
-return kubernetesClient
-.pods()
-

[jira] [Comment Edited] (FLINK-34670) The asyncOperationsThreadPool in SubtaskCheckpointCoordinatorImpl can only create one worker thread

2024-03-27 Thread Jinzhong Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831334#comment-17831334
 ] 

Jinzhong Li edited comment on FLINK-34670 at 3/27/24 12:57 PM:
---

[~roman]    [~pnowojski]  I think this is a critical bug that will result in 
concurrent checkpoints being forced to execute sequentially, as well as causing 
a drastic performance regression of checkpoint aborts. Could you please help 
confirm this problem?


was (Author: lijinzhong):
[~roman]    [~pnowojski]  I think this is a critical bug that will result in 
concurrent checkpoints being forced to execute sequentially, as well as causing 
a drastic performance regression of checkpoint aborts. Could you please take a 
look at it?

> The asyncOperationsThreadPool in SubtaskCheckpointCoordinatorImpl can only 
> create one worker thread
> ---
>
> Key: FLINK-34670
> URL: https://issues.apache.org/jira/browse/FLINK-34670
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.18.0, 1.19.0
>Reporter: Jinzhong Li
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.20.0
>
> Attachments: image-2024-03-14-20-24-14-198.png, 
> image-2024-03-14-20-27-37-540.png, image-2024-03-14-20-33-28-851.png
>
>
> Now, the asyncOperations ThreadPoolExecutor of 
> SubtaskCheckpointCoordinatorImpl is create with a LinkedBlockingQueue and 
> zero corePoolSize.
> !image-2024-03-14-20-24-14-198.png|width=614,height=198!
> And in the ThreadPoolExecutor, except for the first time the task is 
> submitted, *no* new thread is created until the queue is full. But the 
> capacity of LinkedBlockingQueue is Integer.Max. This means that there is 
> almost *only one thread* working in this thread pool, *even if* {*}there are 
> many concurrent checkpoint requests or checkpoint abort requests waiting to 
> be processed{*}.
> !image-2024-03-14-20-27-37-540.png|width=614,height=175!
> This problem can be verified by changing ExecutorService implementation in UT 
> SubtaskCheckpointCoordinatorTest#testNotifyCheckpointAbortedDuringAsyncPhase. 
> When the LinkedBlockingQueue and zero corePoolSize are configured, this UT 
> will deadlock because only one worker thread can be created.
> !image-2024-03-14-20-33-28-851.png|width=606,height=235!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34565) Enhance flink kubernetes configMap to accommodate additional configuration files

2024-03-27 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831336#comment-17831336
 ] 

Zhu Zhu commented on FLINK-34565:
-

IIUC, the requirement is to ship more user files, which may be needed by user 
code, to the pod. Supporting configuration files is just a special case of it. 
Shipping them via ConfigMap sounds a bit tricky to me.
cc [~wangyang0918]

> Enhance flink kubernetes configMap to accommodate additional configuration 
> files
> 
>
> Key: FLINK-34565
> URL: https://issues.apache.org/jira/browse/FLINK-34565
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Reporter: Surendra Singh Lilhore
>Priority: Major
>  Labels: pull-request-available
>
> Flink kubernetes client currently supports a fixed number of files 
> (flink-conf.yaml, logback-console.xml, log4j-console.properties) in the JM 
> and TM Pod ConfigMap. In certain scenarios, particularly in app mode, 
> additional configuration files are required for jobs to run successfully. 
> Presently, users must resort to workarounds to include dynamic configuration 
> files in the JM and TM. This proposed improvement allows users to easily add 
> extra files by configuring the 
> '{*}kubernetes.flink.configmap.additional.resources{*}' property. Users can 
> provide a semicolon-separated list of local files in the client Flink config 
> directory that should be included in the Flink ConfigMap.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33982][core] Introduce new config options for Batch Job Recovery [flink]

2024-03-27 Thread via GitHub


JunRuiLee commented on PR #24026:
URL: https://github.com/apache/flink/pull/24026#issuecomment-2022702614

   @zhuzhurk Thanks for your review, I've updated this pr accordingly, PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34670) The asyncOperationsThreadPool in SubtaskCheckpointCoordinatorImpl can only create one worker thread

2024-03-27 Thread Jinzhong Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831334#comment-17831334
 ] 

Jinzhong Li commented on FLINK-34670:
-

[~roman]    [~pnowojski]  I think this is a critical bug that will result in 
concurrent checkpoints being forced to execute sequentially, as well as causing 
a drastic performance regression of checkpoint aborts. Could you please take a 
look at it?

> The asyncOperationsThreadPool in SubtaskCheckpointCoordinatorImpl can only 
> create one worker thread
> ---
>
> Key: FLINK-34670
> URL: https://issues.apache.org/jira/browse/FLINK-34670
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.18.0, 1.19.0
>Reporter: Jinzhong Li
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.20.0
>
> Attachments: image-2024-03-14-20-24-14-198.png, 
> image-2024-03-14-20-27-37-540.png, image-2024-03-14-20-33-28-851.png
>
>
> Now, the asyncOperations ThreadPoolExecutor of 
> SubtaskCheckpointCoordinatorImpl is create with a LinkedBlockingQueue and 
> zero corePoolSize.
> !image-2024-03-14-20-24-14-198.png|width=614,height=198!
> And in the ThreadPoolExecutor, except for the first time the task is 
> submitted, *no* new thread is created until the queue is full. But the 
> capacity of LinkedBlockingQueue is Integer.Max. This means that there is 
> almost *only one thread* working in this thread pool, *even if* {*}there are 
> many concurrent checkpoint requests or checkpoint abort requests waiting to 
> be processed{*}.
> !image-2024-03-14-20-27-37-540.png|width=614,height=175!
> This problem can be verified by changing ExecutorService implementation in UT 
> SubtaskCheckpointCoordinatorTest#testNotifyCheckpointAbortedDuringAsyncPhase. 
> When the LinkedBlockingQueue and zero corePoolSize are configured, this UT 
> will deadlock because only one worker thread can be created.
> !image-2024-03-14-20-33-28-851.png|width=606,height=235!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34906] Only scale when all tasks are running [flink-kubernetes-operator]

2024-03-27 Thread via GitHub


mxm commented on PR #801:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/801#issuecomment-2022693008

   Thanks Rui! The changes make sense to me. To Gyulas point, I think we should 
try to deduplicate the logic such that both Kubernetes autoscaler and 
standalone use the same code path.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34444] Initial implementation of JM operator metric rest api [flink]

2024-03-27 Thread via GitHub


mxm commented on code in PR #24564:
URL: https://github.com/apache/flink/pull/24564#discussion_r1541043451


##
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##
@@ -538,19 +556,19 @@ public TaskMetricStore getTaskMetricStore(String taskID) {
 @ThreadSafe
 public static class TaskMetricStore extends ComponentMetricStore {
 private final Map subtasks;
-private final Map jmOperators;
+private final ComponentMetricStore jmOperator;

Review Comment:
   After reading the rest of the code, this seems to be the case. Retrieving 
all the coordinator metrics at once for a task seems reasonable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34444] Initial implementation of JM operator metric rest api [flink]

2024-03-27 Thread via GitHub


mxm commented on code in PR #24564:
URL: https://github.com/apache/flink/pull/24564#discussion_r1541028016


##
flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java:
##
@@ -277,11 +277,12 @@ public static Configuration forReporter(Configuration 
configuration, String repo
  * JobManager of an operator.
  */
 public static final ConfigOption SCOPE_NAMING_JM_OPERATOR =
-key("metrics.scope.jm-operator")
+key("metrics.scope.coordinator")

Review Comment:
   Do we want to move ahead with this change? If so, we should also change the 
naming internally.



##
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##
@@ -538,19 +556,19 @@ public TaskMetricStore getTaskMetricStore(String taskID) {
 @ThreadSafe
 public static class TaskMetricStore extends ComponentMetricStore {
 private final Map subtasks;
-private final Map jmOperators;
+private final ComponentMetricStore jmOperator;

Review Comment:
   Can't there be multiple coordinators per task? A task can host multiple 
operators, each with their own coordinator. Is the idea to coalesce all of them 
into one metric store?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   >