Re: [PR] [FLINK-33500][Runtime] Run storing the JobGraph an asynchronous operation [flink]

2023-12-05 Thread via GitHub


zhengzhili333 commented on PR #23866:
URL: https://github.com/apache/flink/pull/23866#issuecomment-1840210958

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24150][k8s] Support to configure cpu resource request and limit in pod template [flink]

2023-12-05 Thread via GitHub


Sucran commented on PR #23872:
URL: https://github.com/apache/flink/pull/23872#issuecomment-1840275371

   > -1 from my side based on previous community discussion:
   > 
   > Please see the discussion in 
https://lists.apache.org/thread/6p5tk6obmk1qxf169so498z4vk8cg969 and the 
ticket: https://issues.apache.org/jira/browse/FLINK-33548
   > 
   > We should follow the approach outlined there. If you are interested you 
are welcome to pick up the operator ticket.
   > 
   > Unfortunately your PR can be a large unexpected change to existing users 
so we should not add it.
   
   yes, it may affect existing users, but from my side, if user had do that 
before, they must be very confused aboud why my pod template wasn't be 
considered, just like me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-12-05 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793189#comment-17793189
 ] 

Hangxiang Yu commented on FLINK-27681:
--

Sorry for the late reply.
{quote}However,  the job must fail in the future(When the corrupted block is 
read or compacted, or checkpoint failed number >= tolerable-failed-checkpoint). 
Then it will rollback to the older checkpoint.

The older checkpoint must be before we found the file is corrupted. Therefore, 
it is useless to run a job between the time it is discovered that the file is 
corrupted and the time it actually fails.

In brief, tolerable-failed-checkpoint can work, but the extra cost isn't 
necessary.

BTW, if failing job directly, this 
[comment|https://github.com/apache/flink/pull/23765#discussion_r1404136470] 
will be solved directly.
{quote}
Thanks for the detailed clarification.

I rethinked this, seems that failing the job is more reasonable than failing 
current checkpoint. I'm +1 if we could do that.
{quote}That's a non trivial overhead. Prolonging checkpoint for 10s in many 
cases (especially low throughput large state jobs) will be prohibitively 
expensive, delaying rescaling, e2e exactly once latency, etc. 1s+ for 1GB might 
also be less then ideal to enable by default.
{quote}
Cannot agree more.
{quote}Actually, aren't all of the disks basically have some form of CRC these 
days? I'm certain that's true about SSDs. Having said that, can you 
[~masteryhx] rephrase and elaborate on those 3 scenarios that you think we need 
to protect from? Especially where does the corruption happen?
{quote}
IIUC, Once we have IO operations about the SST, the file maybe corrupted even 
if it may happen very rarely.

RocksDB also shares some situations about using full file checksum[1] which is 
related to our usage:
 # local file which is prepared to upload: as you could see "verify the SST 
file when the whole file is read in DB (e.g., compaction)." in [1], and 
checksum in block level at runtime cannot guarantee the correctness of the SST 
which we could focus on at first.
 # Uploading and Downloaing: Firstly, disk IO and network IO may make the data 
error. Secondly, remote storage is not always reliable. So the checksum could 
be used when SST files are copied to other places (e.g., backup, move, or 
replicate) or stored remotely.

IIUC, checksum in SST level could guarantee the correctness of local file.

And checksum in filesystem level could guarantee the correctness of uploading 
and downloading.

[1] 
https://github.com/facebook/rocksdb/wiki/Full-File-Checksum-and-Checksum-Handoff

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33663] Serialize CallExpressions into SQL [flink]

2023-12-05 Thread via GitHub


dawidwys merged PR #23811:
URL: https://github.com/apache/flink/pull/23811


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24150][k8s] Support to configure cpu resource request and limit in pod template [flink]

2023-12-05 Thread via GitHub


gyfora commented on PR #23872:
URL: https://github.com/apache/flink/pull/23872#issuecomment-1840282229

   I understand that this may be confusing for you but unfortunately we cannot 
merge this PR in this form. Let's work on the operator side to solve this 
properly as seen in the linked lira


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-33663) Serialize CallExpressions into SQL

2023-12-05 Thread Dawid Wysakowicz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz closed FLINK-33663.

Resolution: Implemented

Implemented in bc6c2cec37c45f021ae22a2a7b5ab9537b8506cd

> Serialize CallExpressions into SQL
> --
>
> Key: FLINK-33663
> URL: https://issues.apache.org/jira/browse/FLINK-33663
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> The task is about introducing {{CallSyntax}} and implementing versions for 
> non-standard SQL functions



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32986][test] Fix createTemporaryFunction type inference error [flink]

2023-12-05 Thread via GitHub


jeyhunkarimov commented on PR #23586:
URL: https://github.com/apache/flink/pull/23586#issuecomment-1840305544

   Does it make any difference? yes and no IMHO. 
   Yes, because imagine `test1()` executes the statement 
`createTemporarySystemFunction("add", new TestAddWithOpen)` (also 
`createTemporaryFunction`). In this case, `add` function is visible to 
`test2()`.
   
   No, because, this (creating a new table for each test) would be overkill 
IMHO. We just need to be sure that we are testing the functions that are 
defined within the scope of a particular test (independent whether we use 
`createTemporarySystemFunction` or `createTemporaryFunction`). 
   
   Some alternatives would be:
   - we could explicitly drop functions in each test 
   - we could create a method to upsert a function (if a function exists, drop 
and register the new one)
   - we could use global functions in `before` via 
`createTemporarySystemFunction` and use the functions that should live within 
the scope of the test via `createTemporaryFunction` (we should still have some 
mechanism to drop them)
   
   In any case, the scope of this PR was diverged quite a bit (the main issue 
was already resolved in the meantime). If you guys agree, I would close this PR 
and mark the issue as resolved. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33730][doc] update the Flink upgrade savepoint compatibility table doc [flink]

2023-12-05 Thread via GitHub


JingGe commented on PR #23859:
URL: https://github.com/apache/flink/pull/23859#issuecomment-1840309252

   @PatrickRen Thanks for the review. I have address your suggestion and add 
1.16.x column back. Would you please check it again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32881][checkpoint] Support triggering savepoint in detach mode for CLI and dumping all pending savepoint ids by rest api [flink]

2023-12-05 Thread via GitHub


masteryhx commented on code in PR #23253:
URL: https://github.com/apache/flink/pull/23253#discussion_r1415176143


##
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java:
##
@@ -157,6 +157,14 @@ public class CliFrontendParser {
 + " for changing state backends, native = a 
specific format for the"
 + " chosen state backend, might be faster to take 
and restore from.");
 
+public static final Option SAVEPOINT_DETACH_OPTION =
+new Option(
+"dcp",

Review Comment:
   Thanks for the clarification.
   Seems we have to use a new option because of the historical reason.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33730) Update the compatibility table to only include last three released versions

2023-12-05 Thread Jing Ge (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jing Ge updated FLINK-33730:

Summary: Update the compatibility table to only include last three released 
versions  (was: Update the compatibility table to only include last two 
released versions)

> Update the compatibility table to only include last three released versions
> ---
>
> Key: FLINK-33730
> URL: https://issues.apache.org/jira/browse/FLINK-33730
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Jing Ge
>Assignee: Jing Ge
>Priority: Minor
>  Labels: pull-request-available
>
> Update the compatibility table 
> ([apache-flink:./docs/content/docs/ops/upgrading.md|https://github.com/apache/flink/blob/master/docs/content/docs/ops/upgrading.md#compatibility-table]
>  and 
> [apache-flink:./docs/content.zh/docs/ops/upgrading.md|https://github.com/apache/flink/blob/master/docs/content.zh/docs/ops/upgrading.md#compatibility-table])
>  according to the discussion[1].
>  
> [1] https://lists.apache.org/thread/7yx396x5lmtws0s4t0sf9f2psgny11d6
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32881][checkpoint] Support triggering savepoint in detach mode for CLI and dumping all pending savepoint ids by rest api [flink]

2023-12-05 Thread via GitHub


masteryhx commented on code in PR #23253:
URL: https://github.com/apache/flink/pull/23253#discussion_r1415217180


##
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java:
##
@@ -157,6 +157,14 @@ public class CliFrontendParser {
 + " for changing state backends, native = a 
specific format for the"
 + " chosen state backend, might be faster to take 
and restore from.");
 
+public static final Option SAVEPOINT_DETACH_OPTION =
+new Option(
+"dcp",

Review Comment:
   It will be clearer If we could make it aligned in Flink 2.0.
   cc @Zakelly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28229][connectors] Introduce FLIP-27 alternative to StreamExecutionEnvironment#fromCollection() [flink]

2023-12-05 Thread via GitHub


zentol merged PR #23558:
URL: https://github.com/apache/flink/pull/23558


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-28229) Introduce Source API alternatives for StreamExecutionEnvironment#fromCollection() methods

2023-12-05 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-28229.

Fix Version/s: 1.19.0
   Resolution: Fixed

master:
e44efbff8070dca3489550fdeadc5e1ce31e68c1
18c03f2e6c593a772f64cdb5c089e2911d3cbc89

> Introduce Source API alternatives for 
> StreamExecutionEnvironment#fromCollection() methods
> -
>
> Key: FLINK-28229
> URL: https://issues.apache.org/jira/browse/FLINK-28229
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Alexander Fedulov
>Assignee: Alexander Fedulov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> * FromElementsFunction
>  * FromIteratorFunction
> are based on SourceFunction API



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32881][checkpoint] Support triggering savepoint in detach mode for CLI and dumping all pending savepoint ids by rest api [flink]

2023-12-05 Thread via GitHub


Zakelly commented on code in PR #23253:
URL: https://github.com/apache/flink/pull/23253#discussion_r1415245692


##
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java:
##
@@ -157,6 +157,14 @@ public class CliFrontendParser {
 + " for changing state backends, native = a 
specific format for the"
 + " chosen state backend, might be faster to take 
and restore from.");
 
+public static final Option SAVEPOINT_DETACH_OPTION =
+new Option(
+"dcp",

Review Comment:
   Hi @xiangforever2014 @masteryhx sorry for jumping in... And I'm wondering if 
we could use the capital '-D' for short option? Or I suggest we do not leave 
any short option for this, only using '-detach'.
   
   I agree we could adjust these CLI options in 2.0 .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33603) Fix guava shading for GCS connector

2023-12-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33603:
---
Labels: pull-request-available  (was: )

> Fix guava shading for GCS connector
> ---
>
> Key: FLINK-33603
> URL: https://issues.apache.org/jira/browse/FLINK-33603
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.17.1
>Reporter: Jayadeep Jayaraman
>Priority: Major
>  Labels: pull-request-available
>
> GCS connector has guava shading issue, This change introduced dependency on 
> guava version({{{}31.1-jre{}}}) required by {{{}google-cloud-storage{}}}. 
> Upgrade of {{google-cloud-storage}} lead to runtime failure because of new 
> functionalities added in {{{}31.1-jre{}}}.
> This change pins guava version to the one required by storage client 
> specifically in {{{}flink-gs-fs-hadoop{}}}, leaving all other filesystem 
> implementation untouched.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32881][checkpoint] Support triggering savepoint in detach mode for CLI and dumping all pending savepoint ids by rest api [flink]

2023-12-05 Thread via GitHub


Zakelly commented on PR #23253:
URL: https://github.com/apache/flink/pull/23253#issuecomment-1840388226

   Hi @xiangforever2014 @masteryhx sorry for jumping in... And I'm wondering if 
we could use the capital '-D' for short option? Or I suggest we do not leave 
any short option for this, only using '-detach'. WDYT?
   
   I agree we could adjust these CLI options in 2.0. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [ FLINK-33603][FileSystems] shade guava in gs-fs filesystem [flink]

2023-12-05 Thread via GitHub


singhravidutt commented on PR #23489:
URL: https://github.com/apache/flink/pull/23489#issuecomment-1840388220

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32881][checkpoint] Support triggering savepoint in detach mode for CLI and dumping all pending savepoint ids by rest api [flink]

2023-12-05 Thread via GitHub


masteryhx commented on code in PR #23253:
URL: https://github.com/apache/flink/pull/23253#discussion_r1415222035


##
docs/content.zh/docs/deployment/cli.md:
##
@@ -125,6 +125,43 @@ Lastly, you can optionally provide what should be the 
[binary format]({{< ref "d
 
 The path to the savepoint can be used later on to [restart the Flink 
job](#starting-a-job-from-a-savepoint).
 
+If the state of the job is quite big, the client will get a timeout exception 
since it should wait for the savepoint finished.

Review Comment:
   ```suggestion
   If the state size of the job is quite big, the client will get a timeout 
exception since it has to wait for the savepoint finished.
   ```



##
docs/content.zh/docs/deployment/cli.md:
##
@@ -125,6 +125,43 @@ Lastly, you can optionally provide what should be the 
[binary format]({{< ref "d
 
 The path to the savepoint can be used later on to [restart the Flink 
job](#starting-a-job-from-a-savepoint).
 
+If the state of the job is quite big, the client will get a timeout exception 
since it should wait for the savepoint finished.
+```
+Triggering savepoint for job bec5244e09634ad71a80785937a9732d.
+Waiting for response...
+
+--
+The program finished with the following exception:
+
+org.apache.flink.util.FlinkException: Triggering a savepoint for the job 
bec5244e09634ad71a80785937a9732d failed.
+at 
org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend. java:828)
+at 
org.apache.flink.client.cli.CliFrontend.lambda$savepopint$8(CliFrontend.java:794)
+at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1078)
+at 
org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:779)
+at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1150)
+at 
org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1226)
+at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
+at 
org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1226)
+at org.apache.flink.client.cli.CliFrontend.main(CliFronhtend.java:1194)
+Caused by: java.util.concurrent.TimeoutException
+at 
java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784)
+at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
+at 
org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:822)
+... 8 more
+```
+In this case, we could use "-dsp" option to trigger a detached savepoint, the 
client will return immediately as soon as the trigger id returns.

Review Comment:
   ```suggestion
   In this case, we could use "-dsp" option to trigger a detached savepoint, 
the client will return the trigger id immediately.
   ```



##
docs/content.zh/docs/deployment/cli.md:
##
@@ -125,6 +125,43 @@ Lastly, you can optionally provide what should be the 
[binary format]({{< ref "d
 
 The path to the savepoint can be used later on to [restart the Flink 
job](#starting-a-job-from-a-savepoint).
 
+If the state of the job is quite big, the client will get a timeout exception 
since it should wait for the savepoint finished.
+```
+Triggering savepoint for job bec5244e09634ad71a80785937a9732d.
+Waiting for response...
+
+--
+The program finished with the following exception:
+
+org.apache.flink.util.FlinkException: Triggering a savepoint for the job 
bec5244e09634ad71a80785937a9732d failed.
+at 
org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend. java:828)
+at 
org.apache.flink.client.cli.CliFrontend.lambda$savepopint$8(CliFrontend.java:794)
+at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1078)
+at 
org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:779)
+at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1150)
+at 
org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1226)
+at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
+at 
org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1226)
+at org.apache.flink.client.cli.CliFrontend.main(CliFronhtend.java:1194)
+Caused by: java.util.concurrent.TimeoutException
+at 
java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784)
+at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
+at 
org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:822)
+... 8 more
+```
+In this case, we could use "-dsp" option to trigger a detached savepoint, the 
client will return immediately as soon as the trigger id

Re: [PR] [FLINK-32073][checkpoint] Implement file merging in snapshot [flink]

2023-12-05 Thread via GitHub


Zakelly commented on code in PR #23514:
URL: https://github.com/apache/flink/pull/23514#discussion_r1415294769


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileMergingCheckpointStateOutputStream.java:
##
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
+import org.apache.flink.runtime.checkpoint.filemerging.SegmentFileStateHandle;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * A {@link CheckpointStateOutputStream} that writes into a segment of a file 
and returns a {@link
+ * SegmentFileStateHandle} upon closing. Multiple {@link 
FileMergingCheckpointStateOutputStream}
+ * objects can reuse the same underlying file, so that the checkpoint files 
are merged.
+ *
+ * Important: This implementation is NOT thread-safe. 
Multiple data streams
+ * multiplexing the same file should NOT write concurrently. Instead, it is 
expected that only after
+ * one data stream is closed, will other data streams reuse and write to the 
same underlying file.
+ */
+public class FileMergingCheckpointStateOutputStream extends 
CheckpointStateOutputStream {
+
+private static final Logger LOG =
+
LoggerFactory.getLogger(FileMergingCheckpointStateOutputStream.class);
+
+/**
+ * A proxy of the {@link FileMergingSnapshotManager} that owns this {@link
+ * FileMergingCheckpointStateOutputStream}, with the interfaces for 
dealing with physical files.
+ */
+public abstract static class FileMergingSnapshotManagerProxy {
+/**
+ * Provide a physical file.
+ *
+ * @return Output stream and path of the physical file.
+ * @throws IOException if the physical file cannot be created or 
opened.
+ */
+public abstract Tuple2 providePhysicalFile() 
throws IOException;
+
+/**
+ * Close the stream and create a {@link SegmentFileStateHandle} for a 
file segment.
+ *
+ * @param filePath Path of the physical file.
+ * @param startPos Start position of the segment in the physical file.
+ * @param stateSize Size of the segment.
+ * @return The state handle of the segment.
+ * @throws IOException if any exception happens when closing the file.
+ */
+public abstract SegmentFileStateHandle closeStreamAndCreateStateHandle(
+Path filePath, long startPos, long stateSize) throws 
IOException;
+
+/**
+ * Notify the {@link FileMergingSnapshotManager} that the stream is 
closed exceptionally.
+ *
+ * @throws IOException if any exception happens when deleting the file.
+ */
+public abstract void closeStreamExceptionally() throws IOException;
+}
+
+private final FileMergingSnapshotManagerProxy 
fileMergingSnapshotManagerProxy;
+
+private volatile boolean closed;
+
+/** path of the underlying physical file. */
+private Path filePath;
+
+/** the stream that writes to the underlying physical file. */
+private @Nullable FSDataOutputStream outputStream;
+
+/** start position in the physical file. */
+private long startPos;
+
+/** current position relative to startPos. */
+long curPosRelative = 0;
+
+/** the buffer for writing to the physical file. */
+private final byte[] writeBuffer;
+
+/** current position in the writeBuffer. */
+int bufferPos;
+
+public FileMergingCheckpointStateOutputStream(
+long checkpointID,
+int bufferSize,
+FileMergingSnapshotManagerProxy fileMergingSnapshotManagerProxy) {
+this.fileMergingSnapshotManagerProxy = fileMergingSnapshotManagerProxy;
+this.writeBuffer = new byte[bufferSize];
+}
+
+  

Re: [PR] [FLINK-32073][checkpoint] Implement file merging in snapshot [flink]

2023-12-05 Thread via GitHub


Zakelly commented on PR #23514:
URL: https://github.com/apache/flink/pull/23514#issuecomment-1840444858

   > Thanks @fredia and @Zakelly for your comments. Sorry for the late reply. I 
have addressed your comments. Would you please take another look?
   
   Thanks for your effort. I have no comments but a minor one, PTAL, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLIP-321] Update the docs to add migration periods for deprecated APIs. [flink]

2023-12-05 Thread via GitHub


gaborgsomogyi commented on PR #23865:
URL: https://github.com/apache/flink/pull/23865#issuecomment-1840452321

   Thanks for the explanation!
   
   > In terms of the source / binary compatibility, the main reason we do not 
provide binary compatibility in minor releases for any API stability level is 
because we treat the protocol / format behind the public API as internal.
   
   If I understand correctly then couple of terms/compatibility levels are 
aggregated in the table. Since we're having `japicmp` which purely guarantees 
java based source/binary compatibility I was checking the table only from that 
point of view. The underneath protocols/internal files compatibility is not 
considered.
   All in all that makes sense. Maybe we can highlight this info in a note to 
spare some time to others to avoid this pitfall.
   
   > I agree the compatibility table looks confusing, although the contents 
seem accurate. I am wondering if some more concrete examples can help here.
   
   With fresh mind I've re-read/evaluated the whole again and seems like I've 
thought that `Public` and `PublicEvolving` description are having the same 
pattern in terms of `when to remove old API` statements (2 and 1 minor release 
🙂). I can confirm the migration guide fits with the guarantees matrix.
   
   In general I've the conclusion that existing documentation is valid but 
users are going to spend quite some time to understand each tiny pieces.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33541][table-planner] function RAND and RAND_INTEGER should return type nullable if the arguments are nullable [flink]

2023-12-05 Thread via GitHub


libenchao commented on PR #23779:
URL: https://github.com/apache/flink/pull/23779#issuecomment-1840508653

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33670) Public operators cannot be reused in multi sinks

2023-12-05 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793232#comment-17793232
 ] 

Jeyhun Karimov commented on FLINK-33670:


Hi [~zicat] I tried with Flink master branch, below is the plan I get for your 
case. Looking at the Optimized execution plan, it seems the deduplicate part 
(reuse_id=1) is reused between the two sinks. Do you confirm?

 

== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.print1], fields=[id, ts])
+- LogicalProject(id=[$0], ts=[$1])
   +- LogicalProject(id=[$0], ts=[$1], v=[$2], row_nu=[$3])
      +- LogicalFilter(condition=[=($3, 1)])
         +- LogicalProject(id=[$0], ts=[$1], v=[$2], row_nu=[ROW_NUMBER() OVER 
(PARTITION BY $0 ORDER BY PROCTIME() NULLS FIRST)])
            +- LogicalTableScan(table=[[default_catalog, default_database, 
source]])

LogicalSink(table=[default_catalog.default_database.print2], fields=[id, 
EXPR$1, EXPR$2])
+- LogicalProject(id=[$1], EXPR$1=[TUMBLE_START($0)], EXPR$2=[$2])
   +- LogicalAggregate(group=[\{0, 1}], EXPR$2=[SUM($2)])
      +- LogicalProject($f0=[$TUMBLE($1, 2:INTERVAL SECOND)], id=[$0], 
v=[$2])
         +- LogicalProject(id=[$0], ts=[$1], v=[$2], row_nu=[$3])
            +- LogicalFilter(condition=[=($3, 1)])
               +- LogicalProject(id=[$0], ts=[$1], v=[$2], row_nu=[ROW_NUMBER() 
OVER (PARTITION BY $0 ORDER BY PROCTIME() NULLS FIRST)])
                  +- LogicalTableScan(table=[[default_catalog, 
default_database, source]])

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.print1], fields=[id, ts])
+- Calc(select=[id, ts], where=[=(w0$o0, 1)])
   +- OverAggregate(partitionBy=[id], orderBy=[$3 ASC], window#0=[ROW_NUMBER(*) 
AS w0$o0 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[id, ts, v, 
$3, w0$o0])
      +- Sort(orderBy=[id ASC, $3 ASC])
         +- Calc(select=[id, ts, v, PROCTIME() AS $3])
            +- Exchange(distribution=[hash[id]])
               +- TableSourceScan(table=[[default_catalog, default_database, 
source]], fields=[id, ts, v])

Sink(table=[default_catalog.default_database.print2], fields=[id, EXPR$1, 
EXPR$2])
+- Calc(select=[id, w$start AS EXPR$1, EXPR$2])
   +- HashWindowAggregate(groupBy=[id], window=[TumblingGroupWindow('w$, ts, 
2)], properties=[w$start, w$end, w$rowtime], select=[id, SUM(v) AS EXPR$2])
      +- Calc(select=[ts, id, v], where=[=(w0$o0, 1)])
         +- OverAggregate(partitionBy=[id], orderBy=[$3 ASC], 
window#0=[ROW_NUMBER(*) AS w0$o0 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW], select=[id, ts, v, $3, w0$o0])
            +- Sort(orderBy=[id ASC, $3 ASC])
               +- Calc(select=[id, ts, v, PROCTIME() AS $3])
                  +- Exchange(distribution=[hash[id]])
                     +- TableSourceScan(table=[[default_catalog, 
default_database, source]], fields=[id, ts, v])

== Optimized Execution Plan ==
OverAggregate(partitionBy=[id], orderBy=[$3 ASC], window#0=[ROW_NUMBER(*) AS 
w0$o0 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[id, ts, v, $3, 
w0$o0])(reuse_id=[1])
+- Exchange(distribution=[forward])
   +- Sort(orderBy=[id ASC, $3 ASC])
      +- Exchange(distribution=[keep_input_as_is[hash[id]]])
         +- Calc(select=[id, ts, v, PROCTIME() AS $3])
            +- Exchange(distribution=[hash[id]])
               +- TableSourceScan(table=[[default_catalog, default_database, 
source]], fields=[id, ts, v])

Sink(table=[default_catalog.default_database.print1], fields=[id, ts])
+- Calc(select=[id, ts], where=[(w0$o0 = 1)])
   +- Reused(reference_id=[1])

Sink(table=[default_catalog.default_database.print2], fields=[id, EXPR$1, 
EXPR$2])
+- Calc(select=[id, w$start AS EXPR$1, EXPR$2])
   +- HashWindowAggregate(groupBy=[id], window=[TumblingGroupWindow('w$, ts, 
2)], properties=[w$start, w$end, w$rowtime], select=[id, SUM(v) AS EXPR$2])
      +- Exchange(distribution=[keep_input_as_is[hash[id]]])
         +- Calc(select=[ts, id, v], where=[(w0$o0 = 1)])
            +- Exchange(distribution=[keep_input_as_is[hash[ts]]])
               +- Reused(reference_id=[1])

 

 

 

> Public operators cannot be reused in multi sinks
> 
>
> Key: FLINK-33670
> URL: https://issues.apache.org/jira/browse/FLINK-33670
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Lyn Zhang
>Priority: Major
> Attachments: image-2023-11-28-14-31-30-153.png
>
>
> Dear all:
>    I find that some public operators cannot be reused when submit a job with 
> multi sinks. I have an example as follows:
> {code:java}
> CREATE TABLE source (
>     id              STRING,
>     ts              TIMESTAMP(3),
>     v              BIGINT,
>     WATERMARK FOR ts AS ts - INTERVAL '3' SECON

[jira] [Created] (FLINK-33751) use modules correctly when deserializing json plan

2023-12-05 Thread shuaiqi.guo (Jira)
shuaiqi.guo created FLINK-33751:
---

 Summary: use modules correctly when deserializing json plan
 Key: FLINK-33751
 URL: https://issues.apache.org/jira/browse/FLINK-33751
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.18.0
Reporter: shuaiqi.guo


when serializing and deserializing SQL job by the following SQL Syntax:
{code:java}
COMPILE PLAN ...;
EXECUTE PLAN ...;{code}
if there are two modules in the environment, some bugs appeard when calling 
lookupOptionalSqlOperator():
 # if 2 Operators were found, it will return empty;
 # foundOperators is not ordered by modules order.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33751) use modules correctly when deserializing json plan

2023-12-05 Thread shuaiqi.guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuaiqi.guo updated FLINK-33751:

Attachment: FLINK-33751.patch

> use modules correctly when deserializing json plan
> --
>
> Key: FLINK-33751
> URL: https://issues.apache.org/jira/browse/FLINK-33751
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: shuaiqi.guo
>Priority: Major
> Attachments: FLINK-33751.patch
>
>
> when serializing and deserializing SQL job by the following SQL Syntax:
> {code:java}
> COMPILE PLAN ...;
> EXECUTE PLAN ...;{code}
> if there are two modules in the environment, some bugs appeard when calling 
> lookupOptionalSqlOperator():
>  # if 2 Operators were found, it will return empty;
>  # foundOperators is not ordered by modules order.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33752) When Duration is greater than or equal to 1 day, the display unit is ms.

2023-12-05 Thread Rui Fan (Jira)
Rui Fan created FLINK-33752:
---

 Summary: When Duration is greater than or equal to 1 day, the 
display unit is ms.
 Key: FLINK-33752
 URL: https://issues.apache.org/jira/browse/FLINK-33752
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Configuration
Affects Versions: 1.18.0
Reporter: Rui Fan
Assignee: Rui Fan
 Fix For: 1.19.0, 1.18.1
 Attachments: image-2023-12-05-19-44-17-161.png

When the default value of Duration is 24 hours or 1 day, the display unit is 
ms. (8640 ms).

 

For example, the kubernetes operator doc has 3 options, their default value are 
8640 ms.

https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/

 

!image-2023-12-05-19-44-17-161.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33753) ContinuousFileReaderOperator consume records as mini batch

2023-12-05 Thread Prabhu Joseph (Jira)
Prabhu Joseph created FLINK-33753:
-

 Summary: ContinuousFileReaderOperator consume records as mini batch
 Key: FLINK-33753
 URL: https://issues.apache.org/jira/browse/FLINK-33753
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.18.0
Reporter: Prabhu Joseph


The ContinuousFileReaderOperator reads and collects the records from a split in 
a loop. If the split size is large, then the loop will take more time, and then 
the mailbox executor won't have a chance to process the checkpoint barrier. 
This leads to checkpoint timing out. ContinuousFileReaderOperator could be 
improved to consume the records in a mini batch, similar to Hudi's 
StreamReadOperator (https://issues.apache.org/jira/browse/HUDI-2485).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33752) When Duration is greater than or equal to 1 day, the display unit is ms.

2023-12-05 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-33752:

Description: 
When the default value of Duration is 24 hours or 1 day, the display unit is 
ms. (8640 ms).

 

For example, the kubernetes operator doc has 3 options, their default value are 
8640 ms.

[https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/]

 

This bug from org.apache.flink.util.TimeUtils#formatWithHighestUnit, and it can 
be reproduced by TimeUtilsPrettyPrintingTest.

 

!image-2023-12-05-19-58-07-737.png|width=1247,height=957!

 

 

!image-2023-12-05-19-44-17-161.png!

  was:
When the default value of Duration is 24 hours or 1 day, the display unit is 
ms. (8640 ms).

 

For example, the kubernetes operator doc has 3 options, their default value are 
8640 ms.

https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/

 

!image-2023-12-05-19-44-17-161.png!


> When Duration is greater than or equal to 1 day, the display unit is ms.
> 
>
> Key: FLINK-33752
> URL: https://issues.apache.org/jira/browse/FLINK-33752
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration
>Affects Versions: 1.18.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
> Fix For: 1.19.0, 1.18.1
>
> Attachments: image-2023-12-05-19-44-17-161.png, 
> image-2023-12-05-19-58-07-737.png
>
>
> When the default value of Duration is 24 hours or 1 day, the display unit is 
> ms. (8640 ms).
>  
> For example, the kubernetes operator doc has 3 options, their default value 
> are 8640 ms.
> [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/]
>  
> This bug from org.apache.flink.util.TimeUtils#formatWithHighestUnit, and it 
> can be reproduced by TimeUtilsPrettyPrintingTest.
>  
> !image-2023-12-05-19-58-07-737.png|width=1247,height=957!
>  
>  
> !image-2023-12-05-19-44-17-161.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33752) When Duration is greater than or equal to 1 day, the display unit is ms.

2023-12-05 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-33752:

Attachment: image-2023-12-05-19-58-07-737.png

> When Duration is greater than or equal to 1 day, the display unit is ms.
> 
>
> Key: FLINK-33752
> URL: https://issues.apache.org/jira/browse/FLINK-33752
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration
>Affects Versions: 1.18.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
> Fix For: 1.19.0, 1.18.1
>
> Attachments: image-2023-12-05-19-44-17-161.png, 
> image-2023-12-05-19-58-07-737.png
>
>
> When the default value of Duration is 24 hours or 1 day, the display unit is 
> ms. (8640 ms).
>  
> For example, the kubernetes operator doc has 3 options, their default value 
> are 8640 ms.
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/
>  
> !image-2023-12-05-19-44-17-161.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33754) Serialize QueryOperations into SQL

2023-12-05 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-33754:


 Summary: Serialize QueryOperations into SQL
 Key: FLINK-33754
 URL: https://issues.apache.org/jira/browse/FLINK-33754
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Dawid Wysakowicz
Assignee: Dawid Wysakowicz
 Fix For: 1.19.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33752][Configuration] Change the displayed timeunit to day when the duration is an integral multiple of 1 day [flink]

2023-12-05 Thread via GitHub


1996fanrui opened a new pull request, #23873:
URL: https://github.com/apache/flink/pull/23873

   ## What is the purpose of the change
   
   When the default value of Duration is 24 hours or 1 day, the display unit is 
ms. (8640 ms). For example, the [kubernetes operator 
doc](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/)
 has 3 options, their default value are 8640 ms.
   
   This bug from `org.apache.flink.util.TimeUtils#formatWithHighestUnit`, and 
it can be reproduced by `TimeUtilsPrettyPrintingTest`. 
   
   Bug reason: When the duration is `1 day`, it filters all TimeUnits, so it 
uses the default `TimeUnit.MILLISECONDS`.
   
   https://github.com/apache/flink/assets/38427477/42696e58-e147-45d3-b93f-cafa9ae6ec8e";>
   
   
   
   ## Brief change log
   
   - [FLINK-33752][Configuration] Change the displayed timeunit to day when the 
duration is an integral multiple of 1 day
 - Find the correct `HighestIntegerUnit` for 1day.
   
   ## Verifying this change
   
   - Added some arguments for `TimeUtilsPrettyPrintingTest#testFormatting`

   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33752) When Duration is greater than or equal to 1 day, the display unit is ms.

2023-12-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33752:
---
Labels: pull-request-available  (was: )

> When Duration is greater than or equal to 1 day, the display unit is ms.
> 
>
> Key: FLINK-33752
> URL: https://issues.apache.org/jira/browse/FLINK-33752
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration
>Affects Versions: 1.18.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0, 1.18.1
>
> Attachments: image-2023-12-05-19-44-17-161.png, 
> image-2023-12-05-19-58-07-737.png
>
>
> When the default value of Duration is 24 hours or 1 day, the display unit is 
> ms. (8640 ms).
>  
> For example, the kubernetes operator doc has 3 options, their default value 
> are 8640 ms.
> [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/]
>  
> This bug from org.apache.flink.util.TimeUtils#formatWithHighestUnit, and it 
> can be reproduced by TimeUtilsPrettyPrintingTest.
>  
> !image-2023-12-05-19-58-07-737.png|width=1247,height=957!
>  
>  
> !image-2023-12-05-19-44-17-161.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33266][sql-gateway] Support plan cache for DQL in SQL Gateway [flink]

2023-12-05 Thread via GitHub


zoudan commented on code in PR #23849:
URL: https://github.com/apache/flink/pull/23849#discussion_r1415242114


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ResultProvider.java:
##
@@ -59,4 +59,7 @@ public interface ResultProvider {
  * {@link CloseableIterator#next()} method returns a row.
  */
 boolean isFirstRowReady();
+
+/** Reset this ResultProvider to the origin state when we create it. */
+default void reset() {}

Review Comment:
   You are right, but the problem is that Flink creates the `ResultProvider` 
while translating `ModifyOperation` into `Transformation`(in 
DynamicSinkUtils#convertCollectToRel, and we will not call this when hit a plan 
cache). I find it hard to recreate a new one here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33266][sql-gateway] Support plan cache for DQL in SQL Gateway [flink]

2023-12-05 Thread via GitHub


zoudan commented on code in PR #23849:
URL: https://github.com/apache/flink/pull/23849#discussion_r1415242114


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ResultProvider.java:
##
@@ -59,4 +59,7 @@ public interface ResultProvider {
  * {@link CloseableIterator#next()} method returns a row.
  */
 boolean isFirstRowReady();
+
+/** Reset this ResultProvider to the origin state when we create it. */
+default void reset() {}

Review Comment:
   You are right, but the problem is that Flink creates the `ResultProvider` 
while translating `ModifyOperation` into `Transformation`(in 
DynamicSinkUtils#convertCollectToRel, and we will not call this when hit a plan 
cache). I find it hard to recreate a new one here(As we could not include 
dependencies on flink-table-planner).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33502][network] Prevent DiskTierProducerAgent#getSegmentIdByIndexOfFirstBufferInSegment from throwing NPE when the task is released [flink]

2023-12-05 Thread via GitHub


WencongLiu commented on code in PR #23863:
URL: https://github.com/apache/flink/pull/23863#discussion_r1415495161


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java:
##
@@ -223,9 +228,16 @@ private void releaseResources() {
 }
 }
 
-private Integer retrieveFirstBufferIndexInSegment(int subpartitionId, int 
bufferIndex) {
-return firstBufferIndexInSegment.size() > subpartitionId
-? 
firstBufferIndexInSegment.get(subpartitionId).get(bufferIndex)
-: null;
+private Integer getSegmentIdByIndexOfFirstBufferInSegment(int 
subpartitionId, int bufferIndex) {
+Integer segmentId;
+try {
+segmentId = 
firstBufferIndexInSegment.get(subpartitionId).get(bufferIndex);

Review Comment:
   Good point 🤔. Fixed.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java:
##
@@ -223,9 +228,16 @@ private void releaseResources() {
 }
 }
 
-private Integer retrieveFirstBufferIndexInSegment(int subpartitionId, int 
bufferIndex) {
-return firstBufferIndexInSegment.size() > subpartitionId
-? 
firstBufferIndexInSegment.get(subpartitionId).get(bufferIndex)
-: null;
+private Integer getSegmentIdByIndexOfFirstBufferInSegment(int 
subpartitionId, int bufferIndex) {
+Integer segmentId;
+try {
+segmentId = 
firstBufferIndexInSegment.get(subpartitionId).get(bufferIndex);

Review Comment:
   Good point 🤔. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33752][Configuration] Change the displayed timeunit to day when the duration is an integral multiple of 1 day [flink]

2023-12-05 Thread via GitHub


flinkbot commented on PR #23873:
URL: https://github.com/apache/flink/pull/23873#issuecomment-1840670193

   
   ## CI report:
   
   * 3bbbddd74077c28f009567d49f99e6277223c03b UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33338) Bump FRocksDB version

2023-12-05 Thread Yanfei Lei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-8?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793246#comment-17793246
 ] 

Yanfei Lei commented on FLINK-8:


[~mayuehappy]/[~pnowojski]  Some updates:

I first released a 
[version|https://s01.oss.sonatype.org/content/repositories/releases/io/github/fredia/frocksdbjni/8.6.7-ververica-test-1.0/]
 (based on 8.6.7) for [benchmarking|http://jenkins.flink-speed.xyz/]. This 
version does not include {{ppcle}} and {{windows}}  platform.  And I 
encountered some problems when cherry-pick 
https://issues.apache.org/jira/browse/FLINK-19710 (because this part of the 
code has been updated many times). 

I also ran microbenchmarks based on 8.6.7 version, and there were some big 
[performance 
regressions|http://flink-speed.xyz/comparison/?exe=1%2BL%2Bbenchmark-request%2C3%2BL%2Bmaster&ben=148%2C150%2C202%2C152%2C154%2C156%2C158%2C160%2C162%2C164%2C195%2C166%2C168%2C170%2C172%2C174%2C176%2C178%2C180%2C182%2C352%2C353&env=3&hor=true&bas=3%2BL%2Bmaster&chart=normal+bars]
 compared to 6.20.3.

> Bump FRocksDB version
> -
>
> Key: FLINK-8
> URL: https://issues.apache.org/jira/browse/FLINK-8
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Piotr Nowojski
>Priority: Major
>
> We need to bump RocksDB in order to be able to use new IngestDB and ClipDB 
> commands.
> If some of the required changes haven't been merged to Facebook/RocksDB, we 
> should cherry-pick and include them in our FRocksDB fork.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33502][network] Prevent DiskTierProducerAgent#getSegmentIdByIndexOfFirstBufferInSegment from throwing NPE when the task is released [flink]

2023-12-05 Thread via GitHub


WencongLiu commented on code in PR #23863:
URL: https://github.com/apache/flink/pull/23863#discussion_r1415504026


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java:
##
@@ -223,9 +228,16 @@ private void releaseResources() {
 }
 }
 
-private Integer retrieveFirstBufferIndexInSegment(int subpartitionId, int 
bufferIndex) {
-return firstBufferIndexInSegment.size() > subpartitionId
-? 
firstBufferIndexInSegment.get(subpartitionId).get(bufferIndex)
-: null;
+private Integer getSegmentIdByIndexOfFirstBufferInSegment(int 
subpartitionId, int bufferIndex) {
+Integer segmentId;
+try {
+segmentId = 
firstBufferIndexInSegment.get(subpartitionId).get(bufferIndex);
+} catch (Exception e) {

Review Comment:
I have adjusted the code to ensure that `firstBufferIndexInSegment` remains 
unchanged and only clear the each element of it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33232) Kubernetive Operator Not Able to Take Other Python paramters While Submitting Job Deployment

2023-12-05 Thread chaoran.su (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793249#comment-17793249
 ] 

chaoran.su commented on FLINK-33232:


I think this can be done by change the jobSpec.args, could you please offer a 
example yaml so that I can check the logic for you. [~amar1509] 

> Kubernetive Operator Not Able to Take Other Python paramters While Submitting 
> Job Deployment
> 
>
> Key: FLINK-33232
> URL: https://issues.apache.org/jira/browse/FLINK-33232
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.0, kubernetes-operator-1.7.0
>Reporter: Amarjeet Singh
>Priority: Major
> Fix For: 1.17.1
>
>
> Flink Operator Is not Able to Read the Python Cmd like -pyFiles.
> While apply using Kubernetive on a Flink Session Cluster. The PyFiles are 
> mounted using EFS, Not able to Read the EFS files and apply it



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33669][doc] Update the documentation for RestartStrategy, Checkpoint Storage, and State Backend. [flink]

2023-12-05 Thread via GitHub


zhuzhurk closed pull request #23847: [FLINK-33669][doc] Update the 
documentation for RestartStrategy, Checkpoint Storage, and State Backend.
URL: https://github.com/apache/flink/pull/23847


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-33669) Update the documentation for RestartStrategy, Checkpoint Storage, and State Backend.

2023-12-05 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-33669.
---
Fix Version/s: 1.19.0
   Resolution: Done

master:
e27f8a3a0783d551457a2f424b01267bd1c8c2c2
d9bcb3b40ed5cefadbbaf391dacaa0ecd7fc8243
52d8d3583e5c989da84126a8805ab335408c46c2

> Update the documentation for RestartStrategy, Checkpoint Storage, and State 
> Backend.
> 
>
> Key: FLINK-33669
> URL: https://issues.apache.org/jira/browse/FLINK-33669
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Documentation
>Affects Versions: 1.19.0
>Reporter: Junrui Li
>Assignee: Junrui Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> After the deprecation of complex Java object getter and setter methods in 
> FLIP-381, Flink now recommends the use of ConfigOptions for the configuration 
> of RestartStrategy, Checkpoint Storage, and State Backend. It is necessary 
> that we update FLINK documentation to clearly instruct users on this new 
> recommended approach.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33502][network] Prevent DiskTierProducerAgent#getSegmentIdByIndexOfFirstBufferInSegment from throwing NPE when the task is released [flink]

2023-12-05 Thread via GitHub


WencongLiu commented on code in PR #23863:
URL: https://github.com/apache/flink/pull/23863#discussion_r1415504026


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java:
##
@@ -223,9 +228,16 @@ private void releaseResources() {
 }
 }
 
-private Integer retrieveFirstBufferIndexInSegment(int subpartitionId, int 
bufferIndex) {
-return firstBufferIndexInSegment.size() > subpartitionId
-? 
firstBufferIndexInSegment.get(subpartitionId).get(bufferIndex)
-: null;
+private Integer getSegmentIdByIndexOfFirstBufferInSegment(int 
subpartitionId, int bufferIndex) {
+Integer segmentId;
+try {
+segmentId = 
firstBufferIndexInSegment.get(subpartitionId).get(bufferIndex);
+} catch (Exception e) {

Review Comment:
I have adjusted the code to ensure that `firstBufferIndexInSegment` remains 
unchanged and only clear the each element inside it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-33751] use modules correctly when deserializing json plan [flink]

2023-12-05 Thread via GitHub


shuaiqig opened a new pull request, #23874:
URL: https://github.com/apache/flink/pull/23874

   ## What is the purpose of the change
   
   use modules correctly when deserializing json plan
   
   
   ## Brief change log
   
   fix 2 bug in deserializing json plan:
   1. if 2 Operators were found, it will return empty;
   2. foundOperators is not ordered by modules order.
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33751) use modules correctly when deserializing json plan

2023-12-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33751:
---
Labels: pull-request-available  (was: )

> use modules correctly when deserializing json plan
> --
>
> Key: FLINK-33751
> URL: https://issues.apache.org/jira/browse/FLINK-33751
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: shuaiqi.guo
>Priority: Major
>  Labels: pull-request-available
> Attachments: FLINK-33751.patch
>
>
> when serializing and deserializing SQL job by the following SQL Syntax:
> {code:java}
> COMPILE PLAN ...;
> EXECUTE PLAN ...;{code}
> if there are two modules in the environment, some bugs appeard when calling 
> lookupOptionalSqlOperator():
>  # if 2 Operators were found, it will return empty;
>  # foundOperators is not ordered by modules order.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33751] use modules correctly when deserializing json plan [flink]

2023-12-05 Thread via GitHub


flinkbot commented on PR #23874:
URL: https://github.com/apache/flink/pull/23874#issuecomment-1840731716

   
   ## CI report:
   
   * 65142b1c17f91f24aeb126db32c89268a5d1b14a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33713][core] Deprecate RuntimeContext#getExecutionConfig [flink]

2023-12-05 Thread via GitHub


zhuzhurk commented on code in PR #23848:
URL: https://github.com/apache/flink/pull/23848#discussion_r1415539068


##
flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java:
##
@@ -83,10 +86,27 @@ public AbstractRuntimeUDFContext(
 }
 
 @Override
+// The @Override annotation will be removed in FLINK-2.0 version to avoid 
exposing the
+// executionConfig, and the getter method should only be used internally.

Review Comment:
   It's fine to not document this because `AbstractRuntimeUDFContext` is an 
`@Internal` class.
   The annotation will be removed naturally once 
`RuntimeContext#getExecutionConfig()` is removed.



##
flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java:
##
@@ -325,6 +326,19 @@ public void initializeSerializerUnlessSet(ExecutionConfig 
executionConfig) {
 }
 }
 
+@Internal
+public void initializeSerializerUnlessSet(SerializerFactory 
serializerFactory) {

Review Comment:
   Maybe change `initializeSerializerUnlessSet(ExecutionConfig)` to invoke this 
method to avoid logic duplication?
   ```
   public void initializeSerializerUnlessSet(ExecutionConfig 
executionConfig) {
   initializeSerializerUnlessSet(
   new SerializerFactory() {
   @Override
   public  TypeSerializer createSerializer(
   TypeInformation typeInformation) {
   return 
typeInformation.createSerializer(executionConfig);
   }
   });
   }
   ```



##
flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java:
##
@@ -90,16 +89,6 @@ public void testRuntimeContextAndExecutionConfigParams() 
throws Exception {
 new RichMapFunction() {
 @Override
 public String map(String value) throws Exception {
-Assert.assertTrue(
-1000
-== getRuntimeContext()
-.getExecutionConfig()
-
.getNumberOfExecutionRetries());
-Assert.assertTrue(
-5
-== getRuntimeContext()
-.getExecutionConfig()
-
.getTaskCancellationInterval());

Review Comment:
   We should add some other verifications as a replacement, e.g. object reuse.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33470] Implement restore tests for Join node [flink]

2023-12-05 Thread via GitHub


dawidwys commented on code in PR #23869:
URL: https://github.com/apache/flink/pull/23869#discussion_r1415600246


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java:
##
@@ -117,6 +122,11 @@ public EnumSet supportedRunSteps() {
 return EnumSet.of(TestKind.SQL);
 }
 
+@BeforeEach
+public void clearData() {

Review Comment:
   Nice find!
   
   I think we should be unified with how it's done at other places. 
`JsonPlanTestBase` does it in `AfterEach`. If we reuse a name between those 
two, `JsonPlanTestBase` can fail if executed after `RestoreTestBase`.
   
   Could we also clear the `localRawResultsObservers` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLIP-321] Update the docs to add migration periods for deprecated APIs. [flink]

2023-12-05 Thread via GitHub


pvary commented on PR #23865:
URL: https://github.com/apache/flink/pull/23865#issuecomment-1840787769

   I would suggest to change the 
[previous](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/#api-compatibility-guarantees)
 paragraph, like the following (remove the API compatibility related parts from 
there):
   
   The top part could remain the same, just the title changed to `Compatibility 
guarantees`:
   ```
   Compatibility guarantees
   
   The classes & members of the Java/Scala APIs that are intended for users are 
annotated
   with the following stability annotations:
   
   - Public
   - PublicEvolving
   - Experimental
   
   Annotations on a class also apply to all members of that class, unless 
otherwise annotated.
   Any API without such an annotation is considered internal to Flink, with no 
guarantees being provided.
   ```
   
   Then we should create 2 sub-paragraphs.
   The 1st one could be `Binary compatibility`:
   ```
   We guarantee binary compatibility only between patch releases for Public, 
and PublicEvolving interfaces.
   
   Example: Code written against Public and PublicEvolving interfaces in 1.15.2 
will continue to run in 1.15.3,
   without having to recompile the code.
   That same code would have to be recompiled when upgrading to 1.16.0 though, 
even if no code change is
   required based on the API compatibility guarantees.
   ```
   
   The 2nd one could be `API compatibility`, and here could come the paragraph 
written by @becketqin 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32894] Use 3.3.0 for maven-shade-plugin to support Java 17 [flink-connector-shared-utils]

2023-12-05 Thread via GitHub


echauchot commented on PR #20:
URL: 
https://github.com/apache/flink-connector-shared-utils/pull/20#issuecomment-1840799581

   @snuyanzin as you already started, do you want to take this review ?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33710] Prevent triggering cluster upgrades for permutations of the same overrides [flink-kubernetes-operator]

2023-12-05 Thread via GitHub


mxm commented on PR #721:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/721#issuecomment-1840802299

   PTAL @gyfora 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33710] Prevent triggering cluster upgrades for permutations of the same overrides [flink-kubernetes-operator]

2023-12-05 Thread via GitHub


mxm commented on code in PR #721:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/721#discussion_r1415619399


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##
@@ -180,13 +190,28 @@ public void reconcile(FlinkResourceContext ctx) 
throws Exception {
 }
 }
 
-private void applyAutoscaler(FlinkResourceContext ctx) throws 
Exception {
+private void applyAutoscaler(FlinkResourceContext ctx, @Nullable 
String existingOverrides)
+throws Exception {
 var autoScalerCtx = ctx.getJobAutoScalerContext();
 boolean autoscalerEnabled =
 ctx.getResource().getSpec().getJob() != null
 && 
ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED);
 autoScalerCtx.getConfiguration().set(AUTOSCALER_ENABLED, 
autoscalerEnabled);
+
 autoscaler.scale(autoScalerCtx);
+
+// Check that the overrides actually changed and not merely the String 
representation

Review Comment:
   This is logic has been moved. I think it is easier to understand now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33667] Implement restore tests for MatchRecognize node [flink]

2023-12-05 Thread via GitHub


dawidwys commented on code in PR #23821:
URL: https://github.com/apache/flink/pull/23821#discussion_r1415623981


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeTestPrograms.java:
##
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+/** {@link TableTestProgram} definitions for testing {@link StreamExecMatch}. 
*/
+public class MatchRecognizeTestPrograms {
+static final Row[] SIMPLE_DATA = {
+Row.of(1L, "a"),
+Row.of(2L, "z"),
+Row.of(3L, "b"),
+Row.of(4L, "c"),
+Row.of(5L, "d"),
+Row.of(6L, "a"),
+Row.of(7L, "b"),
+Row.of(8L, "c"),
+Row.of(9L, "a"),
+Row.of(10L, "b")
+};
+
+static final Row[] SIMPLE_DATA2 = {Row.of(11L, "c")};
+
+static final Row[] COMPLEX_DATA = {
+Row.of("ACME", 1L, 19, 1),
+Row.of("BETA", 2L, 18, 1),
+Row.of("ACME", 3L, 17, 2),
+Row.of("ACME", 4L, 13, 3),
+Row.of("BETA", 5L, 16, 2),
+Row.of("ACME", 6L, 20, 4)
+};
+
+static final Row[] COMPLEX_DATA2 = {Row.of("BETA", 7L, 22, 4)};
+
+static final TableTestProgram SIMPLE_MATCH =
+TableTestProgram.of("simple-match", "simple match recognize test")
+.setupTableSource(
+SourceTestStep.newBuilder("MyTable")
+.addSchema(
+"id bigint", "name varchar", 
"proctime as PROCTIME()")
+.producedBeforeRestore(SIMPLE_DATA)
+.producedAfterRestore(SIMPLE_DATA2)
+.build())
+.setupTableSink(
+SinkTestStep.newBuilder("MySink")
+.addSchema("a bigint", "b bigint", "c 
bigint")
+.consumedBeforeRestore(Row.of(6L, 7L, 8L))
+.consumedAfterRestore(Row.of(9L, 10L, 11L))
+.build())
+.runSql(
+"insert into MySink"
++ " SELECT T.aid, T.bid, T.cid\n"
++ " FROM MyTable MATCH_RECOGNIZE (\n"
++ " ORDER BY proctime\n"
++ " MEASURES\n"
++ " `A\"`.id AS aid,\n"
++ " \u006C.id AS bid,\n"
++ " C.id AS cid\n"
++ " PATTERN (`A\"` \u006C C)\n"
++ " DEFINE\n"
++ " `A\"` AS name = 'a',\n"
++ " \u006C AS name = 
'b',\n"
++ " C AS name = 'c'\n"
++ " ) AS T")
+.build();
+
+static final TableTestProgram COMPLEX_MATCH =
+TableTestProgram.of("complex-match", "complex match recognize 
test")

Review Comment:
   Could you please adapt the id according to the javadoc:
   
https://github.com/apache/flink/blob/52d8d3583e5c989da84126a8805ab335408c46c2/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java#L117
   ?



##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeTestPrograms.java:
##
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with thi

Re: [PR] [FLINK-33470] Implement restore tests for Join node [flink]

2023-12-05 Thread via GitHub


dawidwys commented on code in PR #23869:
URL: https://github.com/apache/flink/pull/23869#discussion_r1415628238


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/JoinTestPrograms.java:
##
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.testutils;
+
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecJoin;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+/** {@link TableTestProgram} definitions for testing {@link StreamExecJoin}. */
+public class JoinTestPrograms {
+static final TableTestProgram NON_WINDOW_INNER_JOIN;
+static final TableTestProgram NON_WINDOW_INNER_JOIN_WITH_NULL;
+static final TableTestProgram CROSS_JOIN;
+static final TableTestProgram JOIN_WITH_FILTER;
+static final TableTestProgram INNER_JOIN_WITH_DUPLICATE_KEY;
+static final TableTestProgram INNER_JOIN_WITH_NON_EQUI_JOIN;
+static final TableTestProgram INNER_JOIN_WITH_EQUAL_PK;
+static final TableTestProgram INNER_JOIN_WITH_PK;
+static final TableTestProgram LEFT_JOIN;
+static final TableTestProgram FULL_OUTER;
+static final TableTestProgram RIGHT_JOIN;
+static final TableTestProgram SEMI_JOIN;
+static final TableTestProgram ANTI_JOIN;
+
+static final SourceTestStep EMPLOYEE =
+SourceTestStep.newBuilder("EMPLOYEE")
+.addSchema("deptno int", "salary bigint", "name varchar")
+.addOption("filterable-fields", "salary")
+.producedBeforeRestore(
+Row.of(null, 101L, "Adam"),
+Row.of(1, 1L, "Baker"),
+Row.of(2, 2L, "Charlie"),
+Row.of(3, 2L, "Don"),
+Row.of(7, 6L, "Victor"))
+.producedAfterRestore(
+Row.of(4, 3L, "Juliet"),
+Row.of(4, 4L, "Helena"),
+Row.of(1, 1L, "Ivana"))
+.build();
+
+static final SourceTestStep DEPARTMENT =
+SourceTestStep.newBuilder("DEPARTMENT")
+.addSchema(
+"department_num int", "b2 bigint", "b3 int", 
"department_name varchar")
+.producedBeforeRestore(
+Row.of(null, 102L, 0, "Accounting"),
+Row.of(1, 1L, 0, "Research"),
+Row.of(2, 2L, 1, "Human Resources"),
+Row.of(2, 3L, 2, "HR"),
+Row.of(3, 1L, 2, "Sales"))
+.producedAfterRestore(
+Row.of(2, 4L, 3, "People Operations"), Row.of(4, 
2L, 4, "Engineering"))
+.build();
+
+static final SourceTestStep DEPARTMENT_NONULLS =
+SourceTestStep.newBuilder("DEPARTMENT")
+.addSchema(
+"department_num int", "b2 bigint", "b3 int", 
"department_name varchar")
+.producedBeforeRestore(
+Row.of(1, 1L, 0, "Research"),
+Row.of(2, 2L, 1, "Human Resources"),
+Row.of(2, 3L, 2, "HR"),
+Row.of(3, 1L, 2, "Sales"))
+.producedAfterRestore(Row.of(2, 4L, 3, "People 
Operations"))
+.build();
+static final SourceTestStep SOURCE_T1 =
+SourceTestStep.newBuilder("T1")
+.addSchema("a int", "b bigint", "c varchar")
+.producedBeforeRestore(
+Row.of(1, 1L, "Baker1"),
+Row.of(1, 2L, "Baker2"),
+Row.of(1, 2L, "Baker2"),
+Row.of(1, 5L, "Baker3"),
+Row.of(2, 7L, "Baker5"),
+Row.of(1, 9L, "Baker6"

Re: [PR] [FLINK-33470] Implement restore tests for Join node [flink]

2023-12-05 Thread via GitHub


dawidwys commented on code in PR #23869:
URL: https://github.com/apache/flink/pull/23869#discussion_r1415631030


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java:
##
@@ -117,6 +122,11 @@ public EnumSet supportedRunSteps() {
 return EnumSet.of(TestKind.SQL);
 }
 
+@BeforeEach
+public void clearData() {

Review Comment:
   A food for thought. Would it make sense to add some unique identifier to all 
the sinks/sources in `RestoreTestBase`? We could use e.g. the test id or 
generate some UUID for the duration of a test run.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-33755) Cleanup usage of deprecated StreamExecutionEnvironment#generateSequence

2023-12-05 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-33755:
---

 Summary: Cleanup usage of deprecated 
StreamExecutionEnvironment#generateSequence
 Key: FLINK-33755
 URL: https://issues.apache.org/jira/browse/FLINK-33755
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Sergey Nuyanzin
Assignee: Sergey Nuyanzin






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33755] Cleanup usage of deprecated StreamExecutionEnvironment#generateSequence [flink]

2023-12-05 Thread via GitHub


snuyanzin opened a new pull request, #23875:
URL: https://github.com/apache/flink/pull/23875

   
   ## What is the purpose of the change 
   
   The PR to replace usages of `StreamExecutionEnvironment#generateSequence` 
with `StreamExecutionEnvironment#fromSequence` as mentioned in javadoc
   
   ## Verifying this change
   
   
   This change is a trivial rework
   This change is already covered by existing tests
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no )
 - The runtime per-record code paths (performance sensitive): (no )
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no )
 - The S3 file system connector: (no )
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33755) Cleanup usage of deprecated StreamExecutionEnvironment#generateSequence

2023-12-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33755:
---
Labels: pull-request-available  (was: )

> Cleanup usage of deprecated StreamExecutionEnvironment#generateSequence
> ---
>
> Key: FLINK-33755
> URL: https://issues.apache.org/jira/browse/FLINK-33755
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-05 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1415650282


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/GlobalViewDeclarativeSlotPoolBridge.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.taskexecutor.slot.DefaultTimerService;
+import org.apache.flink.runtime.taskexecutor.slot.TimeoutListener;
+import org.apache.flink.runtime.taskexecutor.slot.TimerService;
+import org.apache.flink.runtime.util.ResourceCounter;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.clock.Clock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+/**
+ * {@link SlotPool} implementation which could use the {@link 
GlobalViewDeclarativeSlotPoolBridge}
+ * to allocate slots in a global view. Note: It's only used for streaming mode 
now.
+ */
+public class GlobalViewDeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolBridge
+implements TimeoutListener {
+
+public static final Logger LOG =
+LoggerFactory.getLogger(GlobalViewDeclarativeSlotPoolBridge.class);
+private final Map increasedResourceRequirements;
+
+private final TimerService 
timerService;
+
+private final @Nonnull Set receivedNewSlots;
+
+private final @Nonnull Map 
preFulfilledFromAvailableSlots;
+private final Time slotRequestMaxInterval;
+
+public GlobalViewDeclarativeSlotPoolBridge(
+JobID jobId,
+DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
+Clock clock,
+Time rpcTimeout,
+Time idleSlotTimeout,
+Time batchSlotTimeout,
+Time slotRequestMaxInterval,
+RequestSlotMatchingStrategy requestSlotMatchingStrategy) {
+super(
+jobId,
+declarativeSlotPoolFactory,
+clock,
+rpcTimeout,
+idleSlotTimeout,
+batchSlotTimeout,
+requestSlotMatchingStrategy);
+this.slotRequestMaxInterval = 
Preconditions.checkNotNull(slotRequestMaxInterval);
+this.receivedNewSlots = new HashSet<>();
+this.preFulfilledFromAvailableSlots = new HashMap<>();
+this.increasedResourceRequirements = new HashMap<>();
+this.timerService =
+new DefaultTimerService<>(
+new ScheduledThreadPoolExecutor(1),

Review Comment:
   +1 for the proposal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33755) Cleanup usage of deprecated StreamExecutionEnvironment#generateSequence

2023-12-05 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-33755:

Component/s: (was: Table SQL / Planner)

> Cleanup usage of deprecated StreamExecutionEnvironment#generateSequence
> ---
>
> Key: FLINK-33755
> URL: https://issues.apache.org/jira/browse/FLINK-33755
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33710] Prevent triggering cluster upgrades for permutations of the same overrides [flink-kubernetes-operator]

2023-12-05 Thread via GitHub


1996fanrui commented on code in PR #721:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/721#discussion_r1415657657


##
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java:
##
@@ -30,12 +31,23 @@
 public class KubernetesScalingRealizerTest {
 
 @Test
-public void testAutoscalerOverridesVertexIdsAreSorted() {
-
+public void 
testAutoscalerOverridesStringDoesNotChangeUnlessOverridesChange() {
 KubernetesJobAutoScalerContext ctx =
 TestingKubernetesAutoscalerUtils.createContext("test", null);
+FlinkDeployment resource = (FlinkDeployment) ctx.getResource();
+
+// Create resource with existing parallelism overrides
+resource.getSpec()
+.getFlinkConfiguration()
+.put(PipelineOptions.PARALLELISM_OVERRIDES.key(), "a:1,b:2");

Review Comment:
   Could we test `"a:1,b:2"` and `"b:2,a:1"`?
   
   Also, the `assertThat` have  "a:1,b:2"`  as well, it's  better to extract  
an field.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33755] Cleanup usage of deprecated StreamExecutionEnvironment#generateSequence [flink]

2023-12-05 Thread via GitHub


flinkbot commented on PR #23875:
URL: https://github.com/apache/flink/pull/23875#issuecomment-1840847783

   
   ## CI report:
   
   * 4acb9f182b43cf89786baa73e7ee1407a12cf58c UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33710] Prevent triggering cluster upgrades for permutations of the same overrides [flink-kubernetes-operator]

2023-12-05 Thread via GitHub


1996fanrui commented on code in PR #721:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/721#discussion_r1415657657


##
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java:
##
@@ -30,12 +31,23 @@
 public class KubernetesScalingRealizerTest {
 
 @Test
-public void testAutoscalerOverridesVertexIdsAreSorted() {
-
+public void 
testAutoscalerOverridesStringDoesNotChangeUnlessOverridesChange() {
 KubernetesJobAutoScalerContext ctx =
 TestingKubernetesAutoscalerUtils.createContext("test", null);
+FlinkDeployment resource = (FlinkDeployment) ctx.getResource();
+
+// Create resource with existing parallelism overrides
+resource.getSpec()
+.getFlinkConfiguration()
+.put(PipelineOptions.PARALLELISM_OVERRIDES.key(), "a:1,b:2");

Review Comment:
   Could we test `"a:1,b:2"` and `"b:2,a:1"`?
   
   Also, the `assertThat` have  `"a:1,b:2"`  as well, it's  better to extract  
an field.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-05 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1415663600


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/GlobalViewDeclarativeSlotPoolBridge.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.taskexecutor.slot.DefaultTimerService;
+import org.apache.flink.runtime.taskexecutor.slot.TimeoutListener;
+import org.apache.flink.runtime.taskexecutor.slot.TimerService;
+import org.apache.flink.runtime.util.ResourceCounter;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.clock.Clock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+/**
+ * {@link SlotPool} implementation which could use the {@link 
GlobalViewDeclarativeSlotPoolBridge}
+ * to allocate slots in a global view. Note: It's only used for streaming mode 
now.
+ */
+public class GlobalViewDeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolBridge
+implements TimeoutListener {
+
+public static final Logger LOG =
+LoggerFactory.getLogger(GlobalViewDeclarativeSlotPoolBridge.class);
+private final Map increasedResourceRequirements;
+
+private final TimerService 
timerService;
+
+private final @Nonnull Set receivedNewSlots;
+
+private final @Nonnull Map 
preFulfilledFromAvailableSlots;
+private final Time slotRequestMaxInterval;
+
+public GlobalViewDeclarativeSlotPoolBridge(
+JobID jobId,
+DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
+Clock clock,
+Time rpcTimeout,
+Time idleSlotTimeout,
+Time batchSlotTimeout,
+Time slotRequestMaxInterval,
+RequestSlotMatchingStrategy requestSlotMatchingStrategy) {
+super(
+jobId,
+declarativeSlotPoolFactory,
+clock,
+rpcTimeout,
+idleSlotTimeout,
+batchSlotTimeout,
+requestSlotMatchingStrategy);
+this.slotRequestMaxInterval = 
Preconditions.checkNotNull(slotRequestMaxInterval);
+this.receivedNewSlots = new HashSet<>();
+this.preFulfilledFromAvailableSlots = new HashMap<>();
+this.increasedResourceRequirements = new HashMap<>();
+this.timerService =
+new DefaultTimerService<>(

Review Comment:
   thanks for the comment.
   SGTM +1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33667] Implement restore tests for MatchRecognize node [flink]

2023-12-05 Thread via GitHub


jnh5y commented on code in PR #23821:
URL: https://github.com/apache/flink/pull/23821#discussion_r1415662216


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeTestPrograms.java:
##
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+/** {@link TableTestProgram} definitions for testing {@link StreamExecMatch}. 
*/
+public class MatchRecognizeTestPrograms {
+static final Row[] SIMPLE_DATA = {
+Row.of(1L, "a"),
+Row.of(2L, "z"),
+Row.of(3L, "b"),
+Row.of(4L, "c"),
+Row.of(5L, "d"),
+Row.of(6L, "a"),
+Row.of(7L, "b"),
+Row.of(8L, "c"),
+Row.of(9L, "a"),
+Row.of(10L, "b")
+};
+
+static final Row[] SIMPLE_DATA2 = {Row.of(11L, "c")};
+
+static final Row[] COMPLEX_DATA = {
+Row.of("ACME", 1L, 19, 1),
+Row.of("BETA", 2L, 18, 1),
+Row.of("ACME", 3L, 17, 2),
+Row.of("ACME", 4L, 13, 3),
+Row.of("BETA", 5L, 16, 2),
+Row.of("ACME", 6L, 20, 4)
+};
+
+static final Row[] COMPLEX_DATA2 = {Row.of("BETA", 7L, 22, 4)};
+
+static final TableTestProgram SIMPLE_MATCH =
+TableTestProgram.of("simple-match", "simple match recognize test")
+.setupTableSource(
+SourceTestStep.newBuilder("MyTable")
+.addSchema(
+"id bigint", "name varchar", 
"proctime as PROCTIME()")
+.producedBeforeRestore(SIMPLE_DATA)
+.producedAfterRestore(SIMPLE_DATA2)
+.build())
+.setupTableSink(
+SinkTestStep.newBuilder("MySink")
+.addSchema("a bigint", "b bigint", "c 
bigint")
+.consumedBeforeRestore(Row.of(6L, 7L, 8L))
+.consumedAfterRestore(Row.of(9L, 10L, 11L))
+.build())
+.runSql(
+"insert into MySink"
++ " SELECT T.aid, T.bid, T.cid\n"
++ " FROM MyTable MATCH_RECOGNIZE (\n"
++ " ORDER BY proctime\n"
++ " MEASURES\n"
++ " `A\"`.id AS aid,\n"
++ " \u006C.id AS bid,\n"
++ " C.id AS cid\n"
++ " PATTERN (`A\"` \u006C C)\n"
++ " DEFINE\n"
++ " `A\"` AS name = 'a',\n"
++ " \u006C AS name = 
'b',\n"
++ " C AS name = 'c'\n"
++ " ) AS T")
+.build();
+
+static final TableTestProgram COMPLEX_MATCH =
+TableTestProgram.of("complex-match", "complex match recognize 
test")

Review Comment:
   Ah, yes; didn't see that.  I was reusing identifiers from the existing tests!
   
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33710] Prevent triggering cluster upgrades for permutations of the same overrides [flink-kubernetes-operator]

2023-12-05 Thread via GitHub


gyfora commented on code in PR #721:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/721#discussion_r1415668748


##
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java:
##
@@ -30,12 +31,23 @@
 public class KubernetesScalingRealizerTest {
 
 @Test
-public void testAutoscalerOverridesVertexIdsAreSorted() {
-
+public void 
testAutoscalerOverridesStringDoesNotChangeUnlessOverridesChange() {
 KubernetesJobAutoScalerContext ctx =
 TestingKubernetesAutoscalerUtils.createContext("test", null);
+FlinkDeployment resource = (FlinkDeployment) ctx.getResource();
+
+// Create resource with existing parallelism overrides
+resource.getSpec()
+.getFlinkConfiguration()
+.put(PipelineOptions.PARALLELISM_OVERRIDES.key(), "a:1,b:2");

Review Comment:
   Would it make sense to reverse this to `"b:2,a1" ` so that the test actually 
fails without the change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-05 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1415668887


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/GlobalViewDeclarativeSlotPoolBridge.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.taskexecutor.slot.DefaultTimerService;
+import org.apache.flink.runtime.taskexecutor.slot.TimeoutListener;
+import org.apache.flink.runtime.taskexecutor.slot.TimerService;
+import org.apache.flink.runtime.util.ResourceCounter;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.clock.Clock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+/**
+ * {@link SlotPool} implementation which could use the {@link 
GlobalViewDeclarativeSlotPoolBridge}
+ * to allocate slots in a global view. Note: It's only used for streaming mode 
now.
+ */
+public class GlobalViewDeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolBridge
+implements TimeoutListener {
+
+public static final Logger LOG =
+LoggerFactory.getLogger(GlobalViewDeclarativeSlotPoolBridge.class);
+private final Map increasedResourceRequirements;
+
+private final TimerService 
timerService;
+
+private final @Nonnull Set receivedNewSlots;
+
+private final @Nonnull Map 
preFulfilledFromAvailableSlots;
+private final Time slotRequestMaxInterval;
+
+public GlobalViewDeclarativeSlotPoolBridge(
+JobID jobId,
+DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
+Clock clock,
+Time rpcTimeout,
+Time idleSlotTimeout,
+Time batchSlotTimeout,
+Time slotRequestMaxInterval,
+RequestSlotMatchingStrategy requestSlotMatchingStrategy) {
+super(
+jobId,
+declarativeSlotPoolFactory,
+clock,
+rpcTimeout,
+idleSlotTimeout,
+batchSlotTimeout,
+requestSlotMatchingStrategy);
+this.slotRequestMaxInterval = 
Preconditions.checkNotNull(slotRequestMaxInterval);
+this.receivedNewSlots = new HashSet<>();
+this.preFulfilledFromAvailableSlots = new HashMap<>();
+this.increasedResourceRequirements = new HashMap<>();
+this.timerService =
+new DefaultTimerService<>(
+new ScheduledThreadPoolExecutor(1),
+slotRequestMaxInterval.toMilliseconds());
+}
+
+@Override
+protected void internalRequestNewAllocatedSlot(PendingRequest 
pendingRequest) {
+pendingRequests.put(pendingRequest.getSlotRequestId(), pendingRequest);
+increasedResourceRequirements.put(pendingRequest.getSlotRequestId(), 
false);
+
+timerService.registerTimeout(
+this, slotRequestMaxInterval.getSize(), 
slotRequestMaxInterval.getUnit());
+}
+
+@Override
+void newSlotsAreAvailable(Collection newSlots) {
+receivedNewSlots.addAll(newSlots);
+if (newSlots.isEmpty() && receivedNewSlots.isEmpty()) {
+// TODO: Do the matching logic only for available slots.
+} 

Re: [PR] [FLINK-33710] Prevent triggering cluster upgrades for permutations of the same overrides [flink-kubernetes-operator]

2023-12-05 Thread via GitHub


1996fanrui commented on code in PR #721:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/721#discussion_r1415657657


##
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java:
##
@@ -30,12 +31,23 @@
 public class KubernetesScalingRealizerTest {
 
 @Test
-public void testAutoscalerOverridesVertexIdsAreSorted() {
-
+public void 
testAutoscalerOverridesStringDoesNotChangeUnlessOverridesChange() {
 KubernetesJobAutoScalerContext ctx =
 TestingKubernetesAutoscalerUtils.createContext("test", null);
+FlinkDeployment resource = (FlinkDeployment) ctx.getResource();
+
+// Create resource with existing parallelism overrides
+resource.getSpec()
+.getFlinkConfiguration()
+.put(PipelineOptions.PARALLELISM_OVERRIDES.key(), "a:1,b:2");

Review Comment:
   Could we test `"a:1,b:2"` and `"b:2,a:1"`?
   
   Also, the `assertThat` have  `"a:1,b:2"`  as well, it's  better to extract  
a variable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33470] Implement restore tests for Join node [flink]

2023-12-05 Thread via GitHub


jnh5y commented on code in PR #23869:
URL: https://github.com/apache/flink/pull/23869#discussion_r1415682052


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java:
##
@@ -117,6 +122,11 @@ public EnumSet supportedRunSteps() {
 return EnumSet.of(TestKind.SQL);
 }
 
+@BeforeEach
+public void clearData() {

Review Comment:
   >A food for thought. Would it make sense to add some unique identifier to 
all the sinks/sources in RestoreTestBase? We could use e.g. the test id or 
generate some UUID for the duration of a test run.
   
   I thought about trying to do that.  Seems like we'd have to do it manually 
in the test cases or update the SQL somehow if we were doing it in the code.  I 
didn't see an easy way to sort that out.



##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java:
##
@@ -117,6 +122,11 @@ public EnumSet supportedRunSteps() {
 return EnumSet.of(TestKind.SQL);
 }
 
+@BeforeEach
+public void clearData() {

Review Comment:
   >Nice find!
   
   Thanks!
   
   > I think we should be unified with how it's done at other places. 
JsonPlanTestBase does it in AfterEach. If we reuse a name between those two, 
JsonPlanTestBase can fail if executed after RestoreTestBase.
   
   Likewise; good catch!  I've updated it to use the AfterEach annotation.  I 
like that.
   
   >Could we also clear the localRawResultsObservers here?
   
   Makes sense.  This also covers clearing any observers configured during the 
restoreTest stage.  Pushing that change...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33470] Implement restore tests for Join node [flink]

2023-12-05 Thread via GitHub


jnh5y commented on code in PR #23869:
URL: https://github.com/apache/flink/pull/23869#discussion_r1415680690


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java:
##
@@ -117,6 +122,11 @@ public EnumSet supportedRunSteps() {
 return EnumSet.of(TestKind.SQL);
 }
 
+@BeforeEach
+public void clearData() {

Review Comment:
   >Nice find!
   Thanks!
   > I think we should be unified with how it's done at other places. 
JsonPlanTestBase does it in AfterEach. If we reuse a name between those two, 
JsonPlanTestBase can fail if executed after RestoreTestBase.
   Likewise; good catch!  I've updated it to use the AfterEach annotation.  I 
like that.
   
   >Could we also clear the localRawResultsObservers here?
   Makes sense.  This also covers clearing any observers configured during the 
restoreTest stage.  Pushing that change...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33710] Prevent triggering cluster upgrades for permutations of the same overrides [flink-kubernetes-operator]

2023-12-05 Thread via GitHub


1996fanrui commented on code in PR #721:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/721#discussion_r1415688264


##
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java:
##
@@ -30,12 +31,23 @@
 public class KubernetesScalingRealizerTest {
 
 @Test
-public void testAutoscalerOverridesVertexIdsAreSorted() {
-
+public void 
testAutoscalerOverridesStringDoesNotChangeUnlessOverridesChange() {
 KubernetesJobAutoScalerContext ctx =
 TestingKubernetesAutoscalerUtils.createContext("test", null);
+FlinkDeployment resource = (FlinkDeployment) ctx.getResource();
+
+// Create resource with existing parallelism overrides
+resource.getSpec()
+.getFlinkConfiguration()
+.put(PipelineOptions.PARALLELISM_OVERRIDES.key(), "a:1,b:2");

Review Comment:
   It's HashMap instead of TreeMap or SortedMap, so the ordering cannot be 
determined, right?
   
   If so, I prefer to test both of `"a:1,b:2"` and `"b:2,a:1"`, wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Flink 31966] Flink Kubernetes operator lacks TLS support [flink-kubernetes-operator]

2023-12-05 Thread via GitHub


gyfora commented on code in PR #712:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/712#discussion_r1415689682


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##
@@ -1075,4 +1106,59 @@ protected void 
updateStatusAfterClusterDeletion(FlinkDeploymentStatus status) {
 status.getJobStatus().setState(JobStatus.FINISHED.name());
 }
 }
+
+private Configuration getOperatorRestConfig(Configuration origConfig) 
throws IOException {
+Configuration conf = new Configuration(origConfig);
+EnvUtils.get(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH)
+.ifPresent(
+path -> {
+if (Files.notExists(Paths.get(path))) {
+return;
+}
+conf.set(
+SecurityOptions.SSL_REST_TRUSTSTORE,
+
EnvUtils.getRequired(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH));
+conf.set(
+
SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD,
+
EnvUtils.getRequired(EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD));
+if 
(SecurityOptions.isRestSSLAuthenticationEnabled(conf)
+&& 
EnvUtils.get(EnvUtils.ENV_OPERATOR_KEYSTORE_PATH)
+.isPresent()) {
+conf.set(
+SecurityOptions.SSL_REST_KEYSTORE,
+
EnvUtils.getRequired(EnvUtils.ENV_OPERATOR_KEYSTORE_PATH));
+conf.set(
+
SecurityOptions.SSL_REST_KEYSTORE_PASSWORD,
+EnvUtils.getRequired(
+
EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD));
+conf.set(
+SecurityOptions.SSL_REST_KEY_PASSWORD,
+EnvUtils.getRequired(
+
EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD));
+} else {
+
conf.removeConfig(SecurityOptions.SSL_REST_KEYSTORE);
+
conf.removeConfig(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD);
+}
+conf.removeConfig(SecurityOptions.SSL_TRUSTSTORE);
+
conf.removeConfig(SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
+conf.removeConfig(SecurityOptions.SSL_KEYSTORE);
+
conf.removeConfig(SecurityOptions.SSL_KEYSTORE_PASSWORD);
+});
+return conf;
+}
+
+private boolean isValidRuntimeException(Configuration conf, 
RuntimeException e) {

Review Comment:
   Please see my previous comment, I think we should get rid of this logic 
completely.



##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##
@@ -195,8 +199,25 @@ public void submitApplicationCluster(
 if (requireHaMetadata) {
 validateHaMetadataExists(conf);
 }
+try {
+deployApplicationCluster(jobSpec, removeOperatorConfigs(conf));
+} catch (RuntimeException e) {
+LOG.warn("Caught Exception " + e.getMessage());
+if (!isValidRuntimeException(conf, e)) {
+throw e;
+}
+}

Review Comment:
   I am very confused by this logic, can we keep the previous logic and just 
let the exceptions fall through? What's the point of logging them? Deploy 
errors should result in errors



##
helm/flink-kubernetes-operator/templates/flink-operator.yaml:
##
@@ -229,6 +244,12 @@ spec:
 - key: keystore.p12
   path: keystore.p12
 {{- end }}
+{{- if .Values.tls.create }}
+- name: flink-operator-cert-secret
+  secret:
+secretName: {{ .Values.tls.certSecret }}

Review Comment:
   this property could be called `.Values.tls.secretName` to be consistent



##
helm/flink-kubernetes-operator/templates/flink-operator.yaml:
##
@@ -110,6 +110,17 @@ spec:
   value: 
-Dlog4j.configurationFile=/opt/flink/conf/log4j-operator.properties
 - name: JVM_ARGS
   value: {{ .Values.jvmArgs.operator }}
+{{- if .Values.tls.create }}
+- name: OPERATOR_KEYSTORE_PATH
+  value: /opt/flink/artifacts/operator-cert/keystore.jks
+- name: OPERATOR_TRUSTSTORE_PATH
+  value: /opt/flink/artifacts/operator-cert/truststore.jks
+

Re: [PR] [FLINK-33667] Implement restore tests for MatchRecognize node [flink]

2023-12-05 Thread via GitHub


jnh5y commented on PR #23821:
URL: https://github.com/apache/flink/pull/23821#issuecomment-1840893823

   > LGTM % the ids issue, I'll merge the PR after #23869 which is necessary 
for the cleanup.
   
   FWIW, MR observes the changelog not final results.  I *think* it should be 
ok to merge before or after the Join PR.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-05 Thread via GitHub


RocMarshal commented on PR #23635:
URL: https://github.com/apache/flink/pull/23635#issuecomment-1840891716

   Thank you @1996fanrui @KarmaGYZ  very much for the review
   
   I have re evaluated the implementation location of the waiting mechanisms 
based on @KarmaGYZ  offline suggestions.
   
   If two waiting mechanisms are placed in DeclarativeSlotPool, there would be 
preciser & conciser information to maintain.
   
   - The maintenance of reserve slot/resource profiles should be simpler and 
more intuitive.
   
   If we can reach an agreement on It,  I would like to confirm again whether 
we still use `mainThreadExecutor` to complete the timeout waiting mechanism for 
checking? If so, this may require changing the `create `method of 
`DeclarativeSlotPoolFactory`
   
   Please let me know your opinions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Flink 31966] Flink Kubernetes operator lacks TLS support [flink-kubernetes-operator]

2023-12-05 Thread via GitHub


gaborgsomogyi commented on code in PR #712:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/712#discussion_r1415671557


##
helm/flink-kubernetes-operator/templates/flink-operator.yaml:
##
@@ -130,6 +141,10 @@ spec:
 - name: flink-artifacts-volume
   mountPath: /opt/flink/artifacts
 {{- end }}
+  {{- if .Values.tls.create }}
+- name: flink-operator-cert-secret
+  mountPath: /opt/flink/artifacts/operator-cert

Review Comment:
   AFAIK when there is an overlap in the mounts then one of them is going to be 
hidden (Either `/opt/flink/artifacts` or `/opt/flink/artifacts/operator-cert`). 
Maybe `/opt/flink/certs` would be a better choice.



##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##
@@ -1075,4 +1106,59 @@ protected void 
updateStatusAfterClusterDeletion(FlinkDeploymentStatus status) {
 status.getJobStatus().setState(JobStatus.FINISHED.name());
 }
 }
+
+private Configuration getOperatorRestConfig(Configuration origConfig) 
throws IOException {
+Configuration conf = new Configuration(origConfig);
+EnvUtils.get(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH)
+.ifPresent(
+path -> {
+if (Files.notExists(Paths.get(path))) {
+return;
+}
+conf.set(
+SecurityOptions.SSL_REST_TRUSTSTORE,
+
EnvUtils.getRequired(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH));
+conf.set(
+
SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD,
+
EnvUtils.getRequired(EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD));
+if 
(SecurityOptions.isRestSSLAuthenticationEnabled(conf)
+&& 
EnvUtils.get(EnvUtils.ENV_OPERATOR_KEYSTORE_PATH)
+.isPresent()) {
+conf.set(
+SecurityOptions.SSL_REST_KEYSTORE,
+
EnvUtils.getRequired(EnvUtils.ENV_OPERATOR_KEYSTORE_PATH));
+conf.set(
+
SecurityOptions.SSL_REST_KEYSTORE_PASSWORD,
+EnvUtils.getRequired(
+
EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD));
+conf.set(
+SecurityOptions.SSL_REST_KEY_PASSWORD,
+EnvUtils.getRequired(
+
EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD));
+} else {
+
conf.removeConfig(SecurityOptions.SSL_REST_KEYSTORE);
+
conf.removeConfig(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD);
+}
+conf.removeConfig(SecurityOptions.SSL_TRUSTSTORE);
+
conf.removeConfig(SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
+conf.removeConfig(SecurityOptions.SSL_KEYSTORE);
+
conf.removeConfig(SecurityOptions.SSL_KEYSTORE_PASSWORD);
+});
+return conf;
+}
+
+private boolean isValidRuntimeException(Configuration conf, 
RuntimeException e) {
+final Optional trustStorePath = 
EnvUtils.get(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH);
+// The ClusterDescriptors always try and create a RestClient from the 
config
+// that would be given to the deployment. When SSL is enabled it will 
throw
+// a ClusterRetrieveException as the operator does not have the certs 
where they
+// would be mounted on the client
+if (SecurityOptions.isRestSSLEnabled(conf)
+&& trustStorePath.isPresent()
+&& Files.exists(Paths.get(trustStorePath.get()))
+&& e.getCause() instanceof ClusterRetrieveException) {
+return true;
+}
+return false;

Review Comment:
   I don't understand this functionality at all🤔 How I can imagine this feature 
from high level perspective is the following:
   * One configs certs for the operator
   * One configs either unsecure or secure connection per job
   * Operator picks up the job config
   * If unsecure then opens an unsecure REST client
   * If secure then opens an secure REST client
   * If any connection fails then we shouldn't catch exceptions
   
   If you can elaborate it would be helpful.
   



##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/

Re: [PR] [FLINK-33441] Implement restore tests for ExecUnion node [flink]

2023-12-05 Thread via GitHub


jnh5y commented on code in PR #23653:
URL: https://github.com/apache/flink/pull/23653#discussion_r1415705636


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java:
##
@@ -91,7 +91,8 @@ public EnumSet supportedSetupSteps() {
 return EnumSet.of(
 TestKind.FUNCTION,
 TestKind.SOURCE_WITH_RESTORE_DATA,
-TestKind.SINK_WITH_RESTORE_DATA);
+TestKind.SINK_WITH_RESTORE_DATA,
+TestKind.SINK_WITH_DATA);

Review Comment:
   I think we can skip the changes in this file if we pass unique data in 
`UNION_TWO_SOURCES`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-25857] Add committer metrics to track the status of committables - non-breaking [flink]

2023-12-05 Thread via GitHub


pvary opened a new pull request, #23876:
URL: https://github.com/apache/flink/pull/23876

   ## What is the purpose of the change
   
   This is the a revisited non-breaking implementation of [FLIP-371: Provide 
initialization context for Committer creation in 
TwoPhaseCommittingSink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-371%3A+Provide+initialization+context+for+Committer+creation+in+TwoPhaseCommittingSink).
   
   Reverts the previous version 
[c3a07f98e5d1d7624adc967932f57d31355d9ddd](https://github.com/apache/flink/commit/c3a07f98e5d1d7624adc967932f57d31355d9ddd)
 with 
[c7625d5fa62a6e9a182f39f53fb7e5626105f3b0](https://github.com/apache/flink/commit/c7625d5fa62a6e9a182f39f53fb7e5626105f3b0).
 Then adds the new implementation 
[99ec936966af527598ca49712c1263bc4aa03c15](https://github.com/apache/flink/commit/99ec936966af527598ca49712c1263bc4aa03c15)
 based on the 
[discussion](https://lists.apache.org/thread/h6nkgth838dlh5s90sd95zd6hlsxwx57) 
on the mailing list.
   
   ## Brief change log
   
   API changes:
   - `TwoPhaseCommittingSink` - new `createCommitter` method with default 
implementation and deprecation
   - `CommitterInitContext` - new object for committer initialization and 
metrics
   - `SinkCommitterMetricGroup` - new metrics group for committer related 
metrics
   - `Sink.createWriter` - parameter changed to `WriterInitContext` with 
default implementation and deprecation
   - `StatefulSink.createWriter` and `TwoPhaseCommittingSink.createWriter` 
method declarations are removed - and they fall back to the 
`Sink.createWriter`, which is backward compatible, since the return type is 
wider
   
   Implementation:
   - `InitContext` - new common context base class for `CommitterInitContext` 
and `WriterInitContext`
   - `WriterInitContext` - replacement for `Sink.InitContext`
   - `CommitterInitContext` - new context for committer creation
   - `InitContextBase` - the common base for the `WriterInitContext` and 
`CommitterInitContext` implementation
   - `Sink.createWriter(InitContext context)` is deprecated in favour of 
`createWriter(WriterInitContext context)` with default implementation
   - `Sink.createWriter(WriterInitContext context)` is created with default 
implementation which wraps the `WriterInitContext` with `InitContextWrapper` to 
the old `Sink.InitContext` and calls the original method
   - `StatefulSink.createWriter` and the `TwoPhaseCommitingSink.createWriter` 
methods are removed - they fall back to the `Sink.createWriter` method which is 
backward compatible
   - `SinkWriterOperator` - to follow the `WriterInitContext` related changes
   - `StatefulSinkWriterStateHandler` - Here we check if the writer created by 
`createWriter` is a `StatefulSinkWriter`
   - `InternalSinkCommitterMetricGroup` - implementation for 
`SinkCommitterMetricGroup`
   - `CommitterOperator` - to create the `CommitterInitContext` and 
`SinkCommitterMetricGroup`. And modify the initialization to  use the new 
`Committer` creation method, and create the metrics group
   - Updating the metrics in the `CommitRequestImpl` and 
`CheckpointCommittableManagerImpl`
   - Propagate the new metricGroup object to finally arrive to the 
`CommitRequestImpl` where we need it (`CommiterOperator` -> 
`CommittableCollector` -> `CheckpointCommittableManagerImpl` -> 
`SubtaskCommittableManager` -> `SubtaskCommittableManager` -> 
`CommitRequestImpl`)
   - SinkAdapterV1 changes to use the new Committer and writer creation method, 
and metrics group (`SinkAdapterV1`, `GlobalCommitterSerializer`, 
`GlobalCommitterOperator`)
   - Updating the tests to propagate the metric group
   - Adding a new test case to `SinkV2MetricsITCase` to make sure that the 
metrics are propagated as expected.
   
   ## Verifying this change
   
   Added a new unit test, and modified the old ones to use the new method.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? not documented - might be worth 
to check the documentation of SinkV2 if it is exists, to update it.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Flink 31966] Flink Kubernetes operator lacks TLS support [flink-kubernetes-operator]

2023-12-05 Thread via GitHub


gaborgsomogyi commented on PR #712:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/712#issuecomment-1840907216

   Sorry for the late response we've quite some tasks to do.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33480] Implement restore tests for GroupAggregate node [flink]

2023-12-05 Thread via GitHub


jnh5y commented on PR #23681:
URL: https://github.com/apache/flink/pull/23681#issuecomment-1840907198

   The CI failure is from spotless.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33667] Implement restore tests for MatchRecognize node [flink]

2023-12-05 Thread via GitHub


dawidwys commented on PR #23821:
URL: https://github.com/apache/flink/pull/23821#issuecomment-1840912550

   > FWIW, MR observes the changelog not final results. I think it should be ok 
to merge before or after the Join PR.
   
   I don't think it makes a difference. Both access static variables.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Give cluster/job role access to k8s services API [flink-kubernetes-operator]

2023-12-05 Thread via GitHub


gyfora commented on PR #668:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/668#issuecomment-1840917676

   Any update on this @sbrother ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-25857] Add committer metrics to track the status of committables - non-breaking [flink]

2023-12-05 Thread via GitHub


flinkbot commented on PR #23876:
URL: https://github.com/apache/flink/pull/23876#issuecomment-1840919846

   
   ## CI report:
   
   * 99ec936966af527598ca49712c1263bc4aa03c15 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32894] Use 3.3.0 for maven-shade-plugin to support Java 17 [flink-connector-shared-utils]

2023-12-05 Thread via GitHub


snuyanzin commented on PR #20:
URL: 
https://github.com/apache/flink-connector-shared-utils/pull/20#issuecomment-1840937326

   yep sure
   @PatrickRen would you mind to bump it to 3.5.1?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLIP-321] Update the docs to add migration periods for deprecated APIs. [flink]

2023-12-05 Thread via GitHub


pvary commented on code in PR #23865:
URL: https://github.com/apache/flink/pull/23865#discussion_r1415756044


##
docs/content/docs/ops/upgrading.md:
##
@@ -60,6 +60,32 @@ Code written against a `PublicEvolving` API in 1.15.2 will 
continue to run in 1.
 That same code would have to be recompiled when upgrading to 1.16.0 though.
 {{< /hint >}}
 
+### Deprecated API Migration Period
+When an API is deprecated, it is marked with the `@Deprecated` annotation and 
a deprecation message is added to the Javadoc.
+According to 
[FLIP-321](https://cwiki.apache.org/confluence/display/FLINK/FLIP-321%3A+Introduce+an+API+deprecation+process),
 
+starting from release 1.18, each deprecated API will have a guaranteed 
migration period depending on the API stability level:
+
+|Annotation|  Guaranteed Migration Period   |

Review Comment:
   Maybe one more column, like:
   ```
   | Annotation  | Guaranteed Migration Period  
   | Could be removed after the migration period|
   
|::|:-:|::|
   | `Public`   | 2 minor releases  
|Next major version
| 
   | `PublicEvolving` | 1 minor release 
   |Next minor version| 
   |  `Experimental`   | 1 patch release for the affected minor release |Next 
patch version   | 
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLIP-321] Update the docs to add migration periods for deprecated APIs. [flink]

2023-12-05 Thread via GitHub


pvary commented on code in PR #23865:
URL: https://github.com/apache/flink/pull/23865#discussion_r1415756044


##
docs/content/docs/ops/upgrading.md:
##
@@ -60,6 +60,32 @@ Code written against a `PublicEvolving` API in 1.15.2 will 
continue to run in 1.
 That same code would have to be recompiled when upgrading to 1.16.0 though.
 {{< /hint >}}
 
+### Deprecated API Migration Period
+When an API is deprecated, it is marked with the `@Deprecated` annotation and 
a deprecation message is added to the Javadoc.
+According to 
[FLIP-321](https://cwiki.apache.org/confluence/display/FLINK/FLIP-321%3A+Introduce+an+API+deprecation+process),
 
+starting from release 1.18, each deprecated API will have a guaranteed 
migration period depending on the API stability level:
+
+|Annotation|  Guaranteed Migration Period   |

Review Comment:
   Maybe one more column, like:
   
   ```
   | Annotation   | Guaranteed Migration Period   | Could 
be removed after the migration period |
   
|::|:-:|:---:|
   | `Public` | 2 minor releases  | Next 
major version  | 
   | `PublicEvolving` | 1 minor release   | Next 
minor version  | 
   |  `Experimental`  | 1 patch release for the affected minor release| Next 
patch version  | 
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30400][build] Stop bundling flink-connector-base [flink-connector-pulsar]

2023-12-05 Thread via GitHub


tisonkun merged PR #61:
URL: https://github.com/apache/flink-connector-pulsar/pull/61


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33470] Implement restore tests for Join node [flink]

2023-12-05 Thread via GitHub


dawidwys commented on code in PR #23869:
URL: https://github.com/apache/flink/pull/23869#discussion_r1415790788


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/JoinTestPrograms.java:
##
@@ -107,7 +107,7 @@ public class JoinTestPrograms {
 
 static {
 NON_WINDOW_INNER_JOIN =
-TableTestProgram.of("non-window-inner-join", "test non-window 
inner join")
+TableTestProgram.of("join-non-window-inner-join", "test 
non-window inner join")

Review Comment:
   Hope you won't find it nitpicking, but can we leave just a single occurence 
of `join`? I think `join-non-window-inner` still makes sense, but I am not a 
native speaker. If you find the duplication better leave it as is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Flink 31966] Flink Kubernetes operator lacks TLS support [flink-kubernetes-operator]

2023-12-05 Thread via GitHub


tagarr commented on code in PR #712:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/712#discussion_r1415815965


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##
@@ -195,8 +199,25 @@ public void submitApplicationCluster(
 if (requireHaMetadata) {
 validateHaMetadataExists(conf);
 }
+try {
+deployApplicationCluster(jobSpec, removeOperatorConfigs(conf));
+} catch (RuntimeException e) {
+LOG.warn("Caught Exception " + e.getMessage());
+if (!isValidRuntimeException(conf, e)) {
+throw e;
+}
+}

Review Comment:
   I'll try and explain the problem. If the tls config for the flinkdeployment 
is different to the operator then on creating (deploying) a FlinkDeployment, a 
rest client is always returned see 
https://github.com/apache/flink/blob/c5808b04fdce9ca0b705b6cc7a64666ab6426875/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java#L232.
 This has the tls configuration for the deployment and may not match that of 
the operator, so will throw an exception even though the rest client is not and 
will never be used. This exception is then propagated into the CR and looks 
like its failed. However after a few reconciles it will correct itself. I 
thought catching this erroneous exception would prevent users thinking they 
have an issue with their deployment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Flink 31966] Flink Kubernetes operator lacks TLS support [flink-kubernetes-operator]

2023-12-05 Thread via GitHub


tagarr commented on code in PR #712:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/712#discussion_r1415823972


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##
@@ -1075,4 +1106,59 @@ protected void 
updateStatusAfterClusterDeletion(FlinkDeploymentStatus status) {
 status.getJobStatus().setState(JobStatus.FINISHED.name());
 }
 }
+
+private Configuration getOperatorRestConfig(Configuration origConfig) 
throws IOException {
+Configuration conf = new Configuration(origConfig);
+EnvUtils.get(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH)
+.ifPresent(
+path -> {
+if (Files.notExists(Paths.get(path))) {
+return;
+}
+conf.set(
+SecurityOptions.SSL_REST_TRUSTSTORE,
+
EnvUtils.getRequired(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH));
+conf.set(
+
SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD,
+
EnvUtils.getRequired(EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD));
+if 
(SecurityOptions.isRestSSLAuthenticationEnabled(conf)
+&& 
EnvUtils.get(EnvUtils.ENV_OPERATOR_KEYSTORE_PATH)
+.isPresent()) {
+conf.set(
+SecurityOptions.SSL_REST_KEYSTORE,
+
EnvUtils.getRequired(EnvUtils.ENV_OPERATOR_KEYSTORE_PATH));
+conf.set(
+
SecurityOptions.SSL_REST_KEYSTORE_PASSWORD,
+EnvUtils.getRequired(
+
EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD));
+conf.set(
+SecurityOptions.SSL_REST_KEY_PASSWORD,
+EnvUtils.getRequired(
+
EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD));
+} else {
+
conf.removeConfig(SecurityOptions.SSL_REST_KEYSTORE);
+
conf.removeConfig(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD);
+}
+conf.removeConfig(SecurityOptions.SSL_TRUSTSTORE);
+
conf.removeConfig(SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
+conf.removeConfig(SecurityOptions.SSL_KEYSTORE);
+
conf.removeConfig(SecurityOptions.SSL_KEYSTORE_PASSWORD);
+});
+return conf;
+}
+
+private boolean isValidRuntimeException(Configuration conf, 
RuntimeException e) {
+final Optional trustStorePath = 
EnvUtils.get(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH);
+// The ClusterDescriptors always try and create a RestClient from the 
config
+// that would be given to the deployment. When SSL is enabled it will 
throw
+// a ClusterRetrieveException as the operator does not have the certs 
where they
+// would be mounted on the client
+if (SecurityOptions.isRestSSLEnabled(conf)
+&& trustStorePath.isPresent()
+&& Files.exists(Paths.get(trustStorePath.get()))
+&& e.getCause() instanceof ClusterRetrieveException) {
+return true;
+}
+return false;

Review Comment:
   Explained above why I'm catching exception. This check here is to see if the 
exception is expected i.e. ssl is on, the operator has a truststore and the 
exception is that of trying to create a restClient. If we remove this, I will 
have to document that the tls config for the deployment needs to be exactly the 
same as the operator i.e. mounted in the same place and using the same store 
password. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-05 Thread via GitHub


afedulov commented on PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1841113426

   @schulzp thanks for the contribution. The approach looks solid. I believe 
what is missing are some tests that check that the non-default inspector is 
actually utilized when set and that the MetricGroup is propagated to it 
correctly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33470] Implement restore tests for Join node [flink]

2023-12-05 Thread via GitHub


jnh5y commented on code in PR #23869:
URL: https://github.com/apache/flink/pull/23869#discussion_r1415899730


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/JoinTestPrograms.java:
##
@@ -107,7 +107,7 @@ public class JoinTestPrograms {
 
 static {
 NON_WINDOW_INNER_JOIN =
-TableTestProgram.of("non-window-inner-join", "test non-window 
inner join")
+TableTestProgram.of("join-non-window-inner-join", "test 
non-window inner join")

Review Comment:
   Yeah, I don't like the duplication either...  Both options are unfortunate; 
I find this one slightly better.  Can we keep it as-is?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33755] Cleanup usage of deprecated StreamExecutionEnvironment#generateSequence [flink]

2023-12-05 Thread via GitHub


snuyanzin commented on PR #23875:
URL: https://github.com/apache/flink/pull/23875#issuecomment-1841156149

   Thanks for taking a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-33755) Cleanup usage of deprecated StreamExecutionEnvironment#generateSequence

2023-12-05 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin closed FLINK-33755.
---

> Cleanup usage of deprecated StreamExecutionEnvironment#generateSequence
> ---
>
> Key: FLINK-33755
> URL: https://issues.apache.org/jira/browse/FLINK-33755
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33755] Cleanup usage of deprecated StreamExecutionEnvironment#generateSequence [flink]

2023-12-05 Thread via GitHub


snuyanzin merged PR #23875:
URL: https://github.com/apache/flink/pull/23875


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-33755) Cleanup usage of deprecated StreamExecutionEnvironment#generateSequence

2023-12-05 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin resolved FLINK-33755.
-
Resolution: Fixed

> Cleanup usage of deprecated StreamExecutionEnvironment#generateSequence
> ---
>
> Key: FLINK-33755
> URL: https://issues.apache.org/jira/browse/FLINK-33755
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33755) Cleanup usage of deprecated StreamExecutionEnvironment#generateSequence

2023-12-05 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793352#comment-17793352
 ] 

Sergey Nuyanzin commented on FLINK-33755:
-

Merged to master as 
[e74592ca92f4eac5bef6e5140ae4e8cc2f0bf1a1|https://github.com/apache/flink/commit/e74592ca92f4eac5bef6e5140ae4e8cc2f0bf1a1]

> Cleanup usage of deprecated StreamExecutionEnvironment#generateSequence
> ---
>
> Key: FLINK-33755
> URL: https://issues.apache.org/jira/browse/FLINK-33755
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33666][table] Use the same constraint name in MergeTableLikeUtil and Schema [flink]

2023-12-05 Thread via GitHub


jeyhunkarimov opened a new pull request, #23877:
URL: https://github.com/apache/flink/pull/23877

   ## What is the purpose of the change
   
   The purpose of this change is to make the constraint generation mechanism 
the same in MergeTableLikeUtil and Schema
   
   ## Brief change log
   
 - Change the constraint generation in `MergeTableLikeUtil` to be the same 
as in `Schema`
 - Fix the related tests
   
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
`MergeTableLikeUtilTest`
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency):  no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:  no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive):  no 
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no 
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33666) MergeTableLikeUtil uses different constraint name than Schema

2023-12-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33666:
---
Labels: pull-request-available  (was: )

> MergeTableLikeUtil uses different constraint name than Schema
> -
>
> Key: FLINK-33666
> URL: https://issues.apache.org/jira/browse/FLINK-33666
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> {{MergeTableLikeUtil}} uses a different algorithm to name constraints than 
> {{Schema}}. 
> {{Schema}} includes the column names.
> {{MergeTableLikeUtil}} uses a hashCode which means it might depend on JVM 
> specifics.
> For consistency we should use the same algorithm. I propose to use 
> {{Schema}}'s logic.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33666][table] Use the same constraint name in MergeTableLikeUtil and Schema [flink]

2023-12-05 Thread via GitHub


flinkbot commented on PR #23877:
URL: https://github.com/apache/flink/pull/23877#issuecomment-1841235550

   
   ## CI report:
   
   * 0fc7e4da09365d5a524a68a715f8c7961d522aac UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Flink 31966] Flink Kubernetes operator lacks TLS support [flink-kubernetes-operator]

2023-12-05 Thread via GitHub


tagarr commented on PR #712:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/712#issuecomment-1841240447

   So if you prefer not catching the exception I can just add some info in the 
example readme about ensuring ssl location and password are consistent or you 
may see exceptions being thrown in the operator


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33470] Implement restore tests for Join node [flink]

2023-12-05 Thread via GitHub


jnh5y commented on code in PR #23869:
URL: https://github.com/apache/flink/pull/23869#discussion_r1416020844


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/JoinTestPrograms.java:
##
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.testutils;
+
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecJoin;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+/** {@link TableTestProgram} definitions for testing {@link StreamExecJoin}. */
+public class JoinTestPrograms {
+static final TableTestProgram NON_WINDOW_INNER_JOIN;
+static final TableTestProgram NON_WINDOW_INNER_JOIN_WITH_NULL;
+static final TableTestProgram CROSS_JOIN;
+static final TableTestProgram JOIN_WITH_FILTER;
+static final TableTestProgram INNER_JOIN_WITH_DUPLICATE_KEY;
+static final TableTestProgram INNER_JOIN_WITH_NON_EQUI_JOIN;
+static final TableTestProgram INNER_JOIN_WITH_EQUAL_PK;
+static final TableTestProgram INNER_JOIN_WITH_PK;
+static final TableTestProgram LEFT_JOIN;
+static final TableTestProgram FULL_OUTER;
+static final TableTestProgram RIGHT_JOIN;
+static final TableTestProgram SEMI_JOIN;
+static final TableTestProgram ANTI_JOIN;
+
+static final SourceTestStep EMPLOYEE =
+SourceTestStep.newBuilder("EMPLOYEE")
+.addSchema("deptno int", "salary bigint", "name varchar")
+.addOption("filterable-fields", "salary")
+.producedBeforeRestore(
+Row.of(null, 101L, "Adam"),
+Row.of(1, 1L, "Baker"),
+Row.of(2, 2L, "Charlie"),
+Row.of(3, 2L, "Don"),
+Row.of(7, 6L, "Victor"))
+.producedAfterRestore(
+Row.of(4, 3L, "Juliet"),
+Row.of(4, 4L, "Helena"),
+Row.of(1, 1L, "Ivana"))
+.build();
+
+static final SourceTestStep DEPARTMENT =
+SourceTestStep.newBuilder("DEPARTMENT")
+.addSchema(
+"department_num int", "b2 bigint", "b3 int", 
"department_name varchar")
+.producedBeforeRestore(
+Row.of(null, 102L, 0, "Accounting"),
+Row.of(1, 1L, 0, "Research"),
+Row.of(2, 2L, 1, "Human Resources"),
+Row.of(2, 3L, 2, "HR"),
+Row.of(3, 1L, 2, "Sales"))
+.producedAfterRestore(
+Row.of(2, 4L, 3, "People Operations"), Row.of(4, 
2L, 4, "Engineering"))
+.build();
+
+static final SourceTestStep DEPARTMENT_NONULLS =
+SourceTestStep.newBuilder("DEPARTMENT")
+.addSchema(
+"department_num int", "b2 bigint", "b3 int", 
"department_name varchar")
+.producedBeforeRestore(
+Row.of(1, 1L, 0, "Research"),
+Row.of(2, 2L, 1, "Human Resources"),
+Row.of(2, 3L, 2, "HR"),
+Row.of(3, 1L, 2, "Sales"))
+.producedAfterRestore(Row.of(2, 4L, 3, "People 
Operations"))
+.build();
+static final SourceTestStep SOURCE_T1 =
+SourceTestStep.newBuilder("T1")
+.addSchema("a int", "b bigint", "c varchar")
+.producedBeforeRestore(
+Row.of(1, 1L, "Baker1"),
+Row.of(1, 2L, "Baker2"),
+Row.of(1, 2L, "Baker2"),
+Row.of(1, 5L, "Baker3"),
+Row.of(2, 7L, "Baker5"),
+Row.of(1, 9L, "Baker6"),

  1   2   >