Re: [PR] [FLINK-33929][jdbc-connector] Support JDBC String field read Fragment read [flink-connector-jdbc]

2024-01-10 Thread via GitHub


MartijnVisser commented on PR #87:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/87#issuecomment-1886531861

   @zhilinli123 I still don't understand the Jira ticket, so I can't really 
review it. Perhaps @snuyanzin or @eskabetxe understand the goal of this PR. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34040) ScalaSerializersMigrationTest.testStableAnonymousClassnameGeneration fails in GHA with JDK 17 and 21

2024-01-10 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805414#comment-17805414
 ] 

Matthias Pohl commented on FLINK-34040:
---

Yeah no. [~snuyanzin] also already gave the hint that it might be related with 
the Scala version that's used in the GitHub workflow profiles. I don't do 
anything related to changing the Scala version. I have to double-check how we 
do it in Azure and apply the same for GHA.

> ScalaSerializersMigrationTest.testStableAnonymousClassnameGeneration fails in 
> GHA with JDK 17 and 21
> 
>
> Key: FLINK-34040
> URL: https://issues.apache.org/jira/browse/FLINK-34040
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Scala, Build System / CI
>Affects Versions: 1.19.0
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: github-actions, test-stability
>
> {code}
> Error: 13:05:23 13:05:23.538 [ERROR] Tests run: 1, Failures: 1, Errors: 0, 
> Skipped: 0, Time elapsed: 0.375 s <<< FAILURE! -- in 
> org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest
> Error: 13:05:23 13:05:23.538 [ERROR] 
> org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest.testStableAnonymousClassnameGeneration
>  -- Time elapsed: 0.371 s <<< FAILURE!
> Jan 07 13:05:23 org.junit.ComparisonFailure: 
> expected:<...MigrationTest$$anon$[8]> but was:<...MigrationTest$$anon$[1]>
> Jan 07 13:05:23   at org.junit.Assert.assertEquals(Assert.java:117)
> Jan 07 13:05:23   at org.junit.Assert.assertEquals(Assert.java:146)
> Jan 07 13:05:23   at 
> org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest.testStableAnonymousClassnameGeneration(ScalaSerializersMigrationTest.scala:60)
> Jan 07 13:05:23   at 
> java.base/java.lang.reflect.Method.invoke(Method.java:580)
> {code}
> The error only happens in the [master GHA 
> nightly|https://github.com/XComp/flink/actions/workflows/nightly-dev.yml] for 
> JDK 17 and 21.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34007) Flink Job stuck in suspend state after losing leadership in HA Mode

2024-01-10 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805411#comment-17805411
 ] 

Matthias Pohl edited comment on FLINK-34007 at 1/11/24 7:36 AM:


I still don't fully understand the error you shared: Shouldn't the 
KubernetesClientException resolve itself because the logic runs in a loop? Is 
this stacktrace you shared only a one-time thing or does it reoccur (which 
would confirm the execution in the loop and indicate that the ConfigMap is in 
some odd state)? Another thing I'm wondering is why the ConfigMap was 
concurrently updated (which caused the KubernetesClientException as far as I 
understand) when there's only one JM running. Are there other processes 
accessing the ConfigMap?

{quote}
[...] flink will not able to restart services (such RM and dispatcher) as 
DefaultLeaderRetrievalService is stopped also [...]
{quote}
The DefaultLeaderRetrievalService is not in charge of restarting any services. 
The LeaderElectionService will trigger the restart of any shut down services 
(in our case the SessionDispatcherLeaderProcess which would be started by the 
DefaultDispatcherRunner; the latter one maintains the Dispatcher's leader 
election) as soon as the JobManager gets the leadership again.


was (Author: mapohl):
I still don't fully understand the error you shared: Shouldn't the 
KubernetesClientException resolve itself because the logic runs in a loop? Is 
this stacktrace you shared only a one-time thing or does it reoccur (which 
would confirm the execution in the loop and indicate that the ConfigMap is in 
some odd state)? Another thing I'm wondering is why the ConfigMap was 
concurrently updated (which caused the KubernetesClientException as far as I 
understand) when there's only one JM running. Are there other processes 
accessing the ConfigMap?

{quote}
[...] flink will not able to restart services (such RM and dispatcher) as 
DefaultLeaderRetrievalService is stopped also [...]
{quote}
The DefaultLeaderRetrievalService is not in charge of restarting any services. 
The LeaderElectionService will trigger the restart of any shut down services 
(in that case the SessionDispatcherLeaderProcess which would be started by the 
DefaultDispatcherRunner; the latter one maintains the Dispatcher's leader 
election) as soon as the JobManager gets the leadership again.

> Flink Job stuck in suspend state after losing leadership in HA Mode
> ---
>
> Key: FLINK-34007
> URL: https://issues.apache.org/jira/browse/FLINK-34007
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.3, 1.17.2, 1.18.1, 1.18.2
>Reporter: Zhenqiu Huang
>Priority: Major
> Attachments: Debug.log, job-manager.log
>
>
> The observation is that Job manager goes to suspend state with a failed 
> container not able to register itself to resource manager after timeout.
> JM Log, see attached
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34007) Flink Job stuck in suspend state after losing leadership in HA Mode

2024-01-10 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805411#comment-17805411
 ] 

Matthias Pohl commented on FLINK-34007:
---

I still don't fully understand the error you shared: Shouldn't the 
KubernetesClientException resolve itself because the logic runs in a loop? Is 
this stacktrace you shared only a one-time thing or does it reoccur (which 
would confirm the execution in the loop and indicate that the ConfigMap is in 
some odd state)? Another thing I'm wondering is why the ConfigMap was 
concurrently updated (which caused the KubernetesClientException as far as I 
understand) when there's only one JM running. Are there other processes 
accessing the ConfigMap?

{quote}
[...] flink will not able to restart services (such RM and dispatcher) as 
DefaultLeaderRetrievalService is stopped also [...]
{quote}
The DefaultLeaderRetrievalService is not in charge of restarting any services. 
The LeaderElectionService will trigger the restart of any shut down services 
(in that case the SessionDispatcherLeaderProcess which would be started by the 
DefaultDispatcherRunner; the latter one maintains the Dispatcher's leader 
election) as soon as the JobManager gets the leadership again.

> Flink Job stuck in suspend state after losing leadership in HA Mode
> ---
>
> Key: FLINK-34007
> URL: https://issues.apache.org/jira/browse/FLINK-34007
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.3, 1.17.2, 1.18.1, 1.18.2
>Reporter: Zhenqiu Huang
>Priority: Major
> Attachments: Debug.log, job-manager.log
>
>
> The observation is that Job manager goes to suspend state with a failed 
> container not able to register itself to resource manager after timeout.
> JM Log, see attached
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33879] Avoids the potential hang of Hybrid Shuffle during redistribution [flink]

2024-01-10 Thread via GitHub


reswqa commented on code in PR #23957:
URL: https://github.com/apache/flink/pull/23957#discussion_r1448417073


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java:
##
@@ -385,7 +395,7 @@ private boolean shouldReclaimBuffersBeforeRequesting(long 
delayForNextCheckMs) {
 
 /** Note that this method may be called by the netty thread. */
 private void recycleBuffer(Object owner, MemorySegment buffer) {
-if (numRequestedBuffers.get() <= bufferPool.getNumBuffers()) {
+if (!isReleased.get() && numRequestedBuffers.get() <= 
bufferPool.getNumBuffers()) {

Review Comment:
   If downstream task thread pass this check, but not got to the next line yet. 
At the same time, the upstream thread run and complete the `release`. Is it 
safe enough here 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34015][checkpoint] fix that passing ignore-unclaimed-state through dynamic props does not take effect [flink]

2024-01-10 Thread via GitHub


xiangforever2014 commented on code in PR #24058:
URL: https://github.com/apache/flink/pull/24058#discussion_r1448416625


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java:
##
@@ -163,14 +163,16 @@ public static SavepointRestoreSettings forPath(
 public static void toConfiguration(
 final SavepointRestoreSettings savepointRestoreSettings,
 final Configuration configuration) {
-configuration.set(
-SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
-savepointRestoreSettings.allowNonRestoredState());
-configuration.set(
-SavepointConfigOptions.RESTORE_MODE, 
savepointRestoreSettings.getRestoreMode());
-final String savepointPath = savepointRestoreSettings.getRestorePath();
-if (savepointPath != null) {
-configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, 
savepointPath);
+if (!savepointRestoreSettings.equals(SavepointRestoreSettings.none())) 
{

Review Comment:
   Yes I also notice this behavior, it seems a little bit weird, since it will 
only write the config of `SAVEPOINT_IGNORE_UNCLAIMED_STATE` and `RESTORE_MODE` 
in the method `toConfiguration` if we pass a SavepointRestoreSettings.none to 
it, the savepoint path will not write to the config. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33929][jdbc-connector] Support JDBC String field read Fragment read [flink-connector-jdbc]

2024-01-10 Thread via GitHub


zhilinli123 commented on PR #87:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/87#issuecomment-1886457713

   PTAL: @MartijnVisser thanks !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33879] Avoids the potential hang of Hybrid Shuffle during redistribution [flink]

2024-01-10 Thread via GitHub


jiangxin369 commented on code in PR #23957:
URL: https://github.com/apache/flink/pull/23957#discussion_r1448384184


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java:
##
@@ -187,6 +187,7 @@ public void finish() throws IOException {
 
 @Override
 public void close() {
+storageMemoryManager.release();

Review Comment:
   To make sure the buffers are not recycled to `bufferQueue`, I added a check 
of `isReleased` state. If the memory manager is released, all buffers would be 
recycled to the buffer pool.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33879] Avoids the potential hang of Hybrid Shuffle during redistribution [flink]

2024-01-10 Thread via GitHub


jiangxin369 commented on PR #23957:
URL: https://github.com/apache/flink/pull/23957#issuecomment-1886429024

   @reswqa Thanks for the review, I've updated the PR, please take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20281][table] support consuming cdc stream about window tvf aggregate [flink]

2024-01-10 Thread via GitHub


LadyForest commented on code in PR #24030:
URL: https://github.com/apache/flink/pull/24030#discussion_r1448344823


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala:
##
@@ -756,6 +756,147 @@ object TestData {
 row("2020-10-10 00:00:34", 1, 3d, 3f, new JBigDecimal("3.33"), 
"Comment#3", "b")
   )
 
+  val windowChangelogDataWithTimestamp: Seq[Row] = List(

Review Comment:
   ++-+---+-+-+---++---+
   | Op |  Timestamp  | 1 |  2  |  3  |   4   | 5  | 6 |
   ++-+---+-+-+---++---+
   | +I | 2020-10-10 00:00:01 | 1 | 1.0 | 1.0 |  1.11 | Hi | a |
   | +I | 2020-10-10 00:00:02 | 2 | 2.0 | 2.0 |  2.22 | Comment#1  | a |
   | -D | 2020-10-10 00:00:03 | 1 | 1.0 | 1.0 |  1.11 | Hi | a |
   | +I | 2020-10-10 00:00:03 | 2 | 2.0 | 2.0 |  2.22 | Comment#1  | a |
   | +I | 2020-10-10 00:00:04 | 5 | 5.0 | 5.0 |  5.55 |null| a |
   | -U | 2020-10-10 00:00:04 | 2 | 2.0 | 2.0 |  2.22 | Comment#1  | a |
   | +U | 2020-10-10 00:00:04 | 22|22.0 |22.2 | 22.22 | Comment#22 | a |
   | +I | 2020-10-10 00:00:07 | 3 | 3.0 | 3.0 |  null |   Hello| b |
   | +I | 2020-10-10 00:00:06 | 6 | 6.0 | 6.0 |  6.66 | Hi | b |
   | +I | 2020-10-10 00:00:08 | 3 |null | 3.0 |  3.33 | Comment#2  | a |
   | +I | 2020-10-10 00:00:04 | 5 | 5.0 |null |  5.55 | Hi | a |
   | +I | 2020-10-10 00:00:16 | 4 | 4.0 | 4.0 |  4.44 | Hi | b |
   | -D | 2020-10-10 00:00:04 | 5 | 5.0 | 5.0 |  5.55 |null| a |
   | +I | 2020-10-10 00:00:38 | 8 | 8.0 | 8.0 |  8.88 | Comment#4  | b |
   | -D | 2020-10-10 00:00:39 | 8 | 8.0 | 8.0 |  8.88 | Comment#4  | b |
   ++-+---+-+-+---++---+
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20281][table] support consuming cdc stream about window tvf aggregate [flink]

2024-01-10 Thread via GitHub


LadyForest commented on code in PR #24030:
URL: https://github.com/apache/flink/pull/24030#discussion_r1448379812


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala:
##
@@ -756,6 +756,147 @@ object TestData {
 row("2020-10-10 00:00:34", 1, 3d, 3f, new JBigDecimal("3.33"), 
"Comment#3", "b")
   )
 
+  val windowChangelogDataWithTimestamp: Seq[Row] = List(

Review Comment:
   ```
   ++-+---+-+-+---++---+
   | Op |  Timestamp  | 1 |  2  |  3  |   4   | 5  | 6 |
   ++-+---+-+-+---++---+
   | +I | 2020-10-10 00:00:01 | 1 | 1.0 | 1.0 |  1.11 | Hi | a |
   | +I | 2020-10-10 00:00:02 | 2 | 2.0 | 2.0 |  2.22 | Comment#1  | a |
   | -D | 2020-10-10 00:00:03 | 1 | 1.0 | 1.0 |  1.11 | Hi | a |
   | +I | 2020-10-10 00:00:03 | 2 | 2.0 | 2.0 |  2.22 | Comment#1  | a |
   | +I | 2020-10-10 00:00:04 | 5 | 5.0 | 5.0 |  5.55 |null| a |
   | -U | 2020-10-10 00:00:04 | 2 | 2.0 | 2.0 |  2.22 | Comment#1  | a |
   | +U | 2020-10-10 00:00:04 | 22|22.0 |22.2 | 22.22 | Comment#22 | a |
   | +I | 2020-10-10 00:00:07 | 3 | 3.0 | 3.0 |  null |   Hello| b |
   | +I | 2020-10-10 00:00:06 | 6 | 6.0 | 6.0 |  6.66 | Hi | b |
   | +I | 2020-10-10 00:00:08 | 3 |null | 3.0 |  3.33 | Comment#2  | a |
   | +I | 2020-10-10 00:00:04 | 5 | 5.0 |null |  5.55 | Hi | a |
   | +I | 2020-10-10 00:00:16 | 4 | 4.0 | 4.0 |  4.44 | Hi | b |
   | -D | 2020-10-10 00:00:04 | 5 | 5.0 | 5.0 |  5.55 |null| a |
   | +I | 2020-10-10 00:00:38 | 8 | 8.0 | 8.0 |  8.88 | Comment#4  | b |
   | -D | 2020-10-10 00:00:39 | 8 | 8.0 | 8.0 |  8.88 | Comment#4  | b |
   ++-+---+-+-+---++---+
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33932) Support Retry Mechanism in RocksDBStateDataTransfer

2024-01-10 Thread xiangyu feng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805398#comment-17805398
 ] 

xiangyu feng commented on FLINK-33932:
--

[~masteryhx] Thx for ur reply, I've talked to [~dianer17] to update the 
description of this issue. Would you kindly assign this issue to me? Also, I 
would like to hear more from you about this issue in the discussion thread of 
FLIP-414: [https://lists.apache.org/thread/om4kgd6trx2lctwm6x92q2kdjngxtz9k]

> Support Retry Mechanism in RocksDBStateDataTransfer
> ---
>
> Key: FLINK-33932
> URL: https://issues.apache.org/jira/browse/FLINK-33932
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Guojun Li
>Priority: Major
>  Labels: pull-request-available
>
> Currently, there is no retry mechanism for downloading and uploading RocksDB 
> state files. Any jittering of remote filesystem might lead to a checkpoint 
> failure. By supporting retry mechanism in RocksDBStateDataTransfer, we can 
> significantly reduce the failure rate of checkpoint during asynchronous phase.
> The exception is as below:
> {noformat}
>  
> 2023-12-19 08:46:00,197 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Decline 
> checkpoint 2 by task 
> 5b008c2c048fa8534d648455e69f9497_fbcce2f96483f8f15e87dc6c9afd372f_183_0 of 
> job a025f19e at 
> application-6f1c6e3d-1702480803995-5093022-taskmanager-1-1 @ 
> fdbd:dc61:1a:101:0:0:0:36 (dataPort=38789).
> org.apache.flink.util.SerializedThrowable: 
> org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task 
> checkpoint failed.
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  ~[?:?]
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  ~[?:?]
>     at java.lang.Thread.run(Thread.java:829) [?:?]
> Caused by: org.apache.flink.util.SerializedThrowable: java.lang.Exception: 
> Could not materialize checkpoint 2 for operator GlobalGroupAggregate[132] -> 
> Calc[133] (184/500)#0.
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     ... 4 more
> Caused by: org.apache.flink.util.SerializedThrowable: 
> java.util.concurrent.ExecutionException: java.io.IOException: Could not flush 
> to file and close the file system output stream to 
> hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the 
> stream state handle
>     at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
>     at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
>     at 
> org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:544)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     ... 3 more
> Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: 
> Could not flush to file and close the file system output stream to 
> hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the 
> stream state handle
>     at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:516)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:157)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:113)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.ja

Re: [PR] [FLINK-20281][table] support consuming cdc stream about window tvf aggregate [flink]

2024-01-10 Thread via GitHub


LadyForest commented on code in PR #24030:
URL: https://github.com/apache/flink/pull/24030#discussion_r1446810962


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala:
##
@@ -279,19 +279,19 @@ object AggregateUtil extends Enumeration {
   typeFactory: FlinkTypeFactory,
   inputRowType: RowType,
   aggCalls: Seq[AggregateCall],
+  needRetraction: Boolean,
   windowSpec: WindowSpec,
   isStateBackendDataViews: Boolean): AggregateInfoList = {
-// Hopping window requires additional COUNT(*) to determine  whether to 
register next timer
-// through whether the current fired window is empty, see 
SliceSharedWindowAggProcessor.
-val needInputCount = windowSpec.isInstanceOf[HoppingWindowSpec]
+// Hopping window always requires additional COUNT(*) to determine  
whether to register next

Review Comment:
   Nit: remove extra whitespace between "determine" and "whether"



##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala:
##
@@ -219,8 +219,13 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
 val providedTrait = new ModifyKindSetTrait(builder.build())
 createNewNode(window, children, providedTrait, requiredTrait, 
requester)
 
-  case _: StreamPhysicalWindowAggregate | _: StreamPhysicalWindowRank |
-  _: StreamPhysicalWindowDeduplicate =>
+  case window: StreamPhysicalWindowAggregate =>
+// WindowAggregate and WindowTableAggregate support all changes in 
input
+val children = visitChildren(window, ModifyKindSetTrait.ALL_CHANGES)
+val providedTrait = new ModifyKindSetTrait(ModifyKindSet.INSERT_ONLY)

Review Comment:
   Nit: Add a TODO to mark the provided trait set can be extended once we 
support emit strategy using the TVF syntax



##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala:
##
@@ -127,6 +162,26 @@ class WindowAggregateTest(aggPhaseEnforcer: 
AggregatePhaseStrategy) extends Tabl
 util.verifyRelPlan(sql)
   }
 
+  @TestTemplate
+  def testTumble_OnProctimeWithCDCSource(): Unit = {
+assumeThat(isTwoPhase).isTrue

Review Comment:
   Is there any particular reason to just enable two-phase test?



##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java:
##
@@ -231,4 +239,33 @@ protected void collect(RowData aggResult) {
 reuseOutput.replace(ctx.getKeyedStateBackend().getCurrentKey(), 
aggResult);
 ctx.output(reuseOutput);
 }
+
+/** A supplier that returns whether the window is empty. */
+protected final class WindowIsEmptySupplier implements Supplier, 
Serializable {
+private static final long serialVersionUID = 1L;
+
+private final int indexOfCountStar;
+
+private WindowIsEmptySupplier(int indexOfCountStar, SliceAssigner 
assigner) {
+if (assigner instanceof SliceAssigners.HoppingSliceAssigner) {
+checkArgument(
+indexOfCountStar >= 0,
+"Hopping window requires a COUNT(*) in the aggregate 
functions.");
+}
+this.indexOfCountStar = indexOfCountStar;
+}
+
+@Override
+public Boolean get() {
+if (indexOfCountStar < 0) {

Review Comment:
   I saw there is a precondition check `indexOfCounter`. In which condition 
might it be negative?



##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala:
##
@@ -756,6 +756,147 @@ object TestData {
 row("2020-10-10 00:00:34", 1, 3d, 3f, new JBigDecimal("3.33"), 
"Comment#3", "b")
   )
 
+  val windowChangelogDataWithTimestamp: Seq[Row] = List(

Review Comment:
   ++-+---+-+-+---++---+
   | Op |  Timestamp  | 1 |  2  |  3  |   4   | 5  | 6 |
   ++-+---+-+-+---++---+
   | +I | 2020-10-10 00:00:01 | 1 | 1.0 | 1.0 |  1.11 | Hi | a |
   | +I | 2020-10-10 00:00:02 | 2 | 2.0 | 2.0 |  2.22 | Comment#1  | a |
   | -D | 2020-10-10 00:00:03 | 1 | 1.0 | 1.0 |  1.11 | Hi | a |
   | +I | 2020-10-10 00:00:03 | 2 | 2.0 | 2.0 |  2.22 | Comment#1  | a |
   | +I | 2020-10-10 00:00:04 | 5 | 5.0 | 5.0 |  5.55 |null| a |
   | -U | 2020-10-10 00:00:04 | 2 | 2.0 | 2.0 |  2.22 | Comment#1  | a |
   | +U | 2020-10-10 00:00:04 | 22|22.0 |22.2 | 22.22 | Comment#22 | a |
   | +I | 2020-10-10 00:00:07 | 3 | 3.0 | 3.0 |  null |   Hello| b |
   | +I | 2020-10-10 00:00:06 | 6 | 6.0 | 6.0 |  6.66 | Hi | b |
   | +I | 2020-10-10 00:00:08 | 3 |null | 3.0 |  3.33 | Comment#2  | a |
   | +I |

[jira] [Commented] (FLINK-34050) Rocksdb state has space amplification after rescaling with DeleteRange

2024-01-10 Thread Yue Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805392#comment-17805392
 ] 

Yue Ma commented on FLINK-34050:


Hi [~lijinzhong]  

Thanks for reporting this issue. We have also encountered it before. I think 
this is a great suggestion.

Overall, this is still a trade off of time and space

If recovery time is the most important, then we can use deleteRange

If we want to achieve good recovery time and space amplification, then we can 
use deleteRange+deleteFilesInRanges

If space enlargement is very important, then we can consider 
deleteRange+deleteFilesInRanges+CompactRanges

(Of course, perhaps we can see if there are other ways to change space 
reclamation to an asynchronous process)

> Rocksdb state has space amplification after rescaling with DeleteRange
> --
>
> Key: FLINK-34050
> URL: https://issues.apache.org/jira/browse/FLINK-34050
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Priority: Major
> Attachments: image-2024-01-10-21-23-48-134.png, 
> image-2024-01-10-21-24-10-983.png, image-2024-01-10-21-28-24-312.png
>
>
> FLINK-21321 use deleteRange to speed up rocksdb rescaling, however it will 
> cause space amplification in some case.
> We can reproduce this problem using wordCount job:
> 1) before rescaling, state operator in wordCount job has 2 parallelism and 
> 4G+ full checkpoint size;
> !image-2024-01-10-21-24-10-983.png|width=266,height=130!
> 2) then restart job with 4 parallelism (for state operator),  the full 
> checkpoint size of new job will be 8G+ ;
> 3) after many successful checkpoints, the full checkpoint size is still 8G+;
> !image-2024-01-10-21-28-24-312.png|width=454,height=111!
>  
> The root cause of this issue is that the deleted keyGroupRange does not 
> overlap with current DB keyGroupRange, so new data written into rocksdb after 
> rescaling almost never do LSM compaction with the deleted data (belonging to 
> other keyGroupRange.)
>  
> And the space amplification may affect Rocksdb read performance and disk 
> space usage after rescaling. It looks like a regression due to the 
> introduction of deleteRange for rescaling optimization.
>  
> To slove this problem, I think maybe we can invoke 
> Rocksdb.deleteFilesInRanges after deleteRange?
> {code:java}
> public static void clipDBWithKeyGroupRange() {
>   //...
>   List ranges = new ArrayList<>();
>   //...
>   deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes);
>   ranges.add(beginKeyGroupBytes);
>   ranges.add(endKeyGroupBytes);
>   //
>   for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
>  db.deleteFilesInRanges(columnFamilyHandle, ranges, false);
>   }
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33946] [rocksdb] set AvoidFlushDuringShutdown to true for avoiding Flush when Cancel Task [flink]

2024-01-10 Thread via GitHub


flinkbot commented on PR #24069:
URL: https://github.com/apache/flink/pull/24069#issuecomment-1886378530

   
   ## CI report:
   
   * 556acc2a010b25284766bb9eeed2e18f2a8c2eef UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33946) RocksDb sets setAvoidFlushDuringShutdown to true to speed up Task Cancel

2024-01-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33946:
---
Labels: pull-request-available  (was: )

> RocksDb sets setAvoidFlushDuringShutdown to true to speed up Task Cancel
> 
>
> Key: FLINK-33946
> URL: https://issues.apache.org/jira/browse/FLINK-33946
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.19.0
>Reporter: Yue Ma
>Assignee: Yue Ma
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> When a Job fails, the task needs to be canceled and re-deployed. 
> RocksDBStatebackend will call RocksDB.close when disposing.
> {code:java}
> if (!shutting_down_.load(std::memory_order_acquire) &&
> has_unpersisted_data_.load(std::memory_order_relaxed) &&
> !mutable_db_options_.avoid_flush_during_shutdown) {
>   if (immutable_db_options_.atomic_flush) {
> autovector cfds;
> SelectColumnFamiliesForAtomicFlush(&cfds);
> mutex_.Unlock();
> Status s =
> AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kShutDown);
> s.PermitUncheckedError();  //**TODO: What to do on error?
> mutex_.Lock();
>   } else {
> for (auto cfd : *versions_->GetColumnFamilySet()) {
>   if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) {
> cfd->Ref();
> mutex_.Unlock();
> Status s = FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown);
> s.PermitUncheckedError();  //**TODO: What to do on error?
> mutex_.Lock();
> cfd->UnrefAndTryDelete();
>   }
> }
>   } {code}
> By default (avoid_flush_during_shutdown=false) RocksDb requires FlushMemtable 
> when Close. When the disk pressure is high or the Memtable is large, this 
> process will be more time-consuming, which will cause the Task to get stuck 
> in the Canceling stage and affect the speed of job Failover.
> In fact, it is completely unnecessary to Flush memtable when Flink Task is 
> Close, because the data can be replayed from Checkpoint. So we can set 
> avoid_flush_during_shutdown to true to speed up Task Failover



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33946] [rocksdb] set AvoidFlushDuringShutdown to true for avoiding Flush when Cancel Task [flink]

2024-01-10 Thread via GitHub


mayuehappy opened a new pull request, #24069:
URL: https://github.com/apache/flink/pull/24069

set setAvoidFlushDuringShutdown to true for avoiding Flush when Cancel Task
   
   
   ## What is the purpose of the change
   
   set setAvoidFlushDuringShutdown to true for avoiding Flush when Cancel Task
   
   ## Brief change log
   
   set AvoidFlushDuringShutdown to true as default Rocksdb options
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework
   
   
   
   This change is already covered by existing tests
   
   
   This change added tests and can be verified as follows:
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yno)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no )
 - Anything that affects deployment or recovery:  flink-statebackend-rocksdb
 - The S3 file system connector: (no )
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34051][checkpoint] Fix equals/hashCode/toString for SavepointRestoreSettings [flink]

2024-01-10 Thread via GitHub


masteryhx commented on code in PR #24066:
URL: https://github.com/apache/flink/pull/24066#discussion_r1448341514


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java:
##
@@ -105,12 +105,14 @@ public boolean equals(Object o) {
 
 SavepointRestoreSettings that = (SavepointRestoreSettings) o;
 return allowNonRestoredState == that.allowNonRestoredState
-&& (Objects.equals(restorePath, that.restorePath));
+&& Objects.equals(restorePath, that.restorePath)
+&& Objects.equals(restoreMode, that.restoreMode);
 }
 
 @Override
 public int hashCode() {
 int result = restorePath != null ? restorePath.hashCode() : 0;
+result = 31 * result + (restoreMode != null ? restoreMode.hashCode() : 
0);

Review Comment:
   Thanks for the suggestion, I think it's reasonable to mark it as '@ Nonnull'



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-33881) [TtlListState]Avoid copy and update value in TtlListState#getUnexpiredOrNull

2024-01-10 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-33881.
--
Resolution: Fixed

Thanks [~lijinzhong] for the great work!

merged 907d0f32 into master

> [TtlListState]Avoid copy and update value in TtlListState#getUnexpiredOrNull
> 
>
> Key: FLINK-33881
> URL: https://issues.apache.org/jira/browse/FLINK-33881
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Assignee: Jinzhong Li
>Priority: Minor
>  Labels: pull-request-available
> Attachments: image-2023-12-19-21-25-21-446.png, 
> image-2023-12-19-21-26-43-518.png
>
>
> In some scenarios, 'TtlListState#getUnexpiredOrNull -> 
> elementSerializer.copy(ttlValue)'  consumes a lot of cpu resources.
> !image-2023-12-19-21-25-21-446.png|width=529,height=119!
> I found that for TtlListState#getUnexpiredOrNull, if none of the elements 
> have expired, it still needs to copy all the elements and update the whole 
> list/map in TtlIncrementalCleanup#runCleanup();
> !image-2023-12-19-21-26-43-518.png|width=505,height=266!
> I think we could optimize TtlListState#getUnexpiredOrNull by:
> 1)find the first expired element index in the list;
> 2)If not found, return to the original list;
> 3)If found, then constrct the unexpire list (puts the previous elements into 
> the list), and go through the subsequent elements, adding expired elements 
> into the list.
> {code:java}
> public List> getUnexpiredOrNull(@Nonnull List> 
> ttlValues) {
> //...
> int firstExpireIndex = -1;
> for (int i = 0; i < ttlValues.size(); i++) {
> if (TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
> firstExpireIndex = i;
> break;
> }
> }
> if (firstExpireIndex == -1) {
> return ttlValues;  //return the original ttlValues
> }
> List> unexpired = new ArrayList<>(ttlValues.size());
> for (int i = 0; i < ttlValues.size(); i++) {
> if (i < firstExpireIndex) {
> // unexpired.add(ttlValues.get(i));
> unexpired.add(elementSerializer.copy(ttlValues.get(i)));
> }
> if (i > firstExpireIndex) {
> if (!TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
> // unexpired.add(ttlValues.get(i));
> unexpired.add(elementSerializer.copy(ttlValues.get(i)));
> }
> }
> }
> //  .
> } {code}
> *In this way, the extra iteration overhead is actually very very small, but 
> the benefit when there are no expired elements is significant.*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33881]Avoid copy and update value in TtlListState#getUnexpiredOrNull [flink]

2024-01-10 Thread via GitHub


masteryhx closed pull request #24057: [FLINK-33881]Avoid copy and update value 
in TtlListState#getUnexpiredOrNull
URL: https://github.com/apache/flink/pull/24057


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33881]Avoid copy and update value in TtlListState#getUnexpiredOrNull [flink]

2024-01-10 Thread via GitHub


masteryhx commented on PR #24057:
URL: https://github.com/apache/flink/pull/24057#issuecomment-1886347612

   merged.
   Just a reminder, remeber to write commit message with component tag next 
time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34049][table]Refactor classes related to window TVF aggregation to prepare for non-aligned windows [flink]

2024-01-10 Thread via GitHub


flinkbot commented on PR #24068:
URL: https://github.com/apache/flink/pull/24068#issuecomment-1886331556

   
   ## CI report:
   
   * fd074024357c148202e3ff3e218309480ba12569 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-34015) Setting `execution.savepoint.ignore-unclaimed-state` does not take effect when passing this parameter by dynamic properties

2024-01-10 Thread Renxiang Zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17804983#comment-17804983
 ] 

Renxiang Zhou edited comment on FLINK-34015 at 1/11/24 5:48 AM:


[~masteryhx] Hello, hangxiang, please have a look at this issue if you have 
time, many thanks~

And I have implemented a fix for this little bug.
https://github.com/apache/flink/pull/24058


was (Author: JIRAUSER295459):
[~masteryhx] Hello, hangxiang, please have a look at this issue if you have 
time, many thanks~

And I have implemented a fix for this little bug.
[https://github.com/apache/flink/pull/24058|https://github.com/apache/flink/pull/24058,]

> Setting `execution.savepoint.ignore-unclaimed-state` does not take effect 
> when passing this parameter by dynamic properties
> ---
>
> Key: FLINK-34015
> URL: https://issues.apache.org/jira/browse/FLINK-34015
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.17.0
>Reporter: Renxiang Zhou
>Assignee: Renxiang Zhou
>Priority: Critical
>  Labels: ignore-unclaimed-state-invalid, pull-request-available
> Attachments: image-2024-01-08-14-22-09-758.png, 
> image-2024-01-08-14-24-30-665.png
>
>
> We set `execution.savepoint.ignore-unclaimed-state` to true and use -D option 
> to submit the job, but unfortunately we found the value is still false in 
> jobmanager log.
> Pic 1: we  set `execution.savepoint.ignore-unclaimed-state` to true in 
> submiting job.
> !image-2024-01-08-14-22-09-758.png|width=1012,height=222!
> Pic 2: The value is still false in jmlog.
> !image-2024-01-08-14-24-30-665.png|width=651,height=51!
>  
> Besides, the parameter `execution.savepoint-restore-mode` has the same 
> problem since when we pass it by -D option.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34015) Setting `execution.savepoint.ignore-unclaimed-state` does not take effect when passing this parameter by dynamic properties

2024-01-10 Thread Renxiang Zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805378#comment-17805378
 ] 

Renxiang Zhou commented on FLINK-34015:
---

[~masteryhx] Many thanks to you, I will check it~(y)

> Setting `execution.savepoint.ignore-unclaimed-state` does not take effect 
> when passing this parameter by dynamic properties
> ---
>
> Key: FLINK-34015
> URL: https://issues.apache.org/jira/browse/FLINK-34015
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.17.0
>Reporter: Renxiang Zhou
>Assignee: Renxiang Zhou
>Priority: Critical
>  Labels: ignore-unclaimed-state-invalid, pull-request-available
> Attachments: image-2024-01-08-14-22-09-758.png, 
> image-2024-01-08-14-24-30-665.png
>
>
> We set `execution.savepoint.ignore-unclaimed-state` to true and use -D option 
> to submit the job, but unfortunately we found the value is still false in 
> jobmanager log.
> Pic 1: we  set `execution.savepoint.ignore-unclaimed-state` to true in 
> submiting job.
> !image-2024-01-08-14-22-09-758.png|width=1012,height=222!
> Pic 2: The value is still false in jmlog.
> !image-2024-01-08-14-24-30-665.png|width=651,height=51!
>  
> Besides, the parameter `execution.savepoint-restore-mode` has the same 
> problem since when we pass it by -D option.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34049) Refactor classes related to window TVF aggregation to prepare for non-aligned windows

2024-01-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34049:
---
Labels: pull-request-available  (was: )

> Refactor classes related to window TVF aggregation to prepare for non-aligned 
> windows
> -
>
> Key: FLINK-34049
> URL: https://issues.apache.org/jira/browse/FLINK-34049
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
>
> Refactor classes related to window TVF aggregation such as 
> AbstractWindowAggProcessor to prepare for the implementation of non-aligned 
> windows like session window



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-34049][table]Refactor classes related to window TVF aggregation to prepare for non-aligned windows [flink]

2024-01-10 Thread via GitHub


xuyangzhong opened a new pull request, #24068:
URL: https://github.com/apache/flink/pull/24068

   ## What is the purpose of the change
   
   Currently, the implementation of the window aggregate op with new WINDOW TVF 
syntax is strongly bound to slicing windows such as TUMBLE, HOP and CUMULATE. 
This brings difficulties to the introduction of unslicing windows such as 
SESSION.
   
   This pr aims to refactor some classes about window aggregate op to do the 
preparation for the introduction of unslicing windows.
   
   
   ## Brief change log
   
   *(for example:)*
 - *add two package 'groupwindow' and 'windowtvf' under 
'org.apache.flink.table.runtime.operators.window' to split the different 
implements about the legacy group window agg and the new window tvf agg*
 - *move the classes related into these two new packages*
 - *extract abstract class about slicing window processor and operator*
   
   
   ## Verifying this change
   
   Existent tests can cover this changes.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33325) FLIP-375: Built-in cross-platform powerful java profiler

2024-01-10 Thread Yu Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805374#comment-17805374
 ] 

Yu Chen commented on FLINK-33325:
-

Hi [~Zhanghao Chen] , thanks for your attention, the feature will be available 
to everyone soon!

> FLIP-375: Built-in cross-platform powerful java profiler
> 
>
> Key: FLINK-33325
> URL: https://issues.apache.org/jira/browse/FLINK-33325
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST, Runtime / Web Frontend
>Affects Versions: 1.19.0
>Reporter: Yu Chen
>Assignee: Yu Chen
>Priority: Major
>
> This is an umbrella JIRA of 
> [FLIP-375|https://cwiki.apache.org/confluence/x/64lEE]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33337) Expose IngestDB and ClipDB in the official RocksDB API

2024-01-10 Thread Yue Ma (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-7?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yue Ma updated FLINK-7:
---
Attachment: image-2024-01-11-12-03-14-308.png

> Expose IngestDB and ClipDB in the official RocksDB API
> --
>
> Key: FLINK-7
> URL: https://issues.apache.org/jira/browse/FLINK-7
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Piotr Nowojski
>Assignee: Yue Ma
>Priority: Major
> Attachments: image-2024-01-11-12-03-14-308.png
>
>
> Remaining open PRs:
> None :)
> Already merged PRs:
> https://github.com/facebook/rocksdb/pull/11646
> https://github.com/facebook/rocksdb/pull/11868
> https://github.com/facebook/rocksdb/pull/11811
> https://github.com/facebook/rocksdb/pull/11381
> https://github.com/facebook/rocksdb/pull/11379
> https://github.com/facebook/rocksdb/pull/11378



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33337) Expose IngestDB and ClipDB in the official RocksDB API

2024-01-10 Thread Yue Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-7?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805364#comment-17805364
 ] 

Yue Ma commented on FLINK-7:


Update: https://github.com/facebook/rocksdb/pull/12219
This PR fixes a bug that may cause incorrect ClipDB results 

> Expose IngestDB and ClipDB in the official RocksDB API
> --
>
> Key: FLINK-7
> URL: https://issues.apache.org/jira/browse/FLINK-7
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Piotr Nowojski
>Assignee: Yue Ma
>Priority: Major
> Attachments: image-2024-01-11-12-03-14-308.png
>
>
> Remaining open PRs:
> None :)
> Already merged PRs:
> https://github.com/facebook/rocksdb/pull/11646
> https://github.com/facebook/rocksdb/pull/11868
> https://github.com/facebook/rocksdb/pull/11811
> https://github.com/facebook/rocksdb/pull/11381
> https://github.com/facebook/rocksdb/pull/11379
> https://github.com/facebook/rocksdb/pull/11378



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34021) Print jobKey in the Autoscaler standalone log

2024-01-10 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805363#comment-17805363
 ] 

Rui Fan commented on FLINK-34021:
-

Merged to main(1.8.0) via : f67cba9c8269ddd4e27fcdc40cfd8a293ae665de

> Print jobKey in the Autoscaler standalone log
> -
>
> Key: FLINK-34021
> URL: https://issues.apache.org/jira/browse/FLINK-34021
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler
>Affects Versions: kubernetes-operator-1.8.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> FLINK-33814 has supported the multiple threads scaling for autoscaler 
> standalone. When a lot of jobs are scaling, autoscaler standalone will print 
> too many logs. Currently, each log doesn't have the job key, it's hard to 
> maintain.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-34021) Print jobKey in the Autoscaler standalone log

2024-01-10 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan resolved FLINK-34021.
-
Resolution: Fixed

> Print jobKey in the Autoscaler standalone log
> -
>
> Key: FLINK-34021
> URL: https://issues.apache.org/jira/browse/FLINK-34021
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler
>Affects Versions: kubernetes-operator-1.8.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> FLINK-33814 has supported the multiple threads scaling for autoscaler 
> standalone. When a lot of jobs are scaling, autoscaler standalone will print 
> too many logs. Currently, each log doesn't have the job key, it's hard to 
> maintain.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34021][autoscaler] Print jobKey in the Autoscaler standalone log [flink-kubernetes-operator]

2024-01-10 Thread via GitHub


1996fanrui merged PR #750:
URL: https://github.com/apache/flink-kubernetes-operator/pull/750


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34021][autoscaler] Print jobKey in the Autoscaler standalone log [flink-kubernetes-operator]

2024-01-10 Thread via GitHub


1996fanrui commented on PR #750:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/750#issuecomment-1886189083

   Thanks @mxm for the review, merging~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-34059) Add documentation on how to use state TTL hint

2024-01-10 Thread Jane Chan (Jira)
Jane Chan created FLINK-34059:
-

 Summary: Add documentation on how to use state TTL hint
 Key: FLINK-34059
 URL: https://issues.apache.org/jira/browse/FLINK-34059
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation, Table SQL / API
Affects Versions: 1.19.0
Reporter: Jane Chan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]

2024-01-10 Thread via GitHub


1996fanrui commented on code in PR #24003:
URL: https://github.com/apache/flink/pull/24003#discussion_r1448255898


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java:
##
@@ -367,17 +367,13 @@ private void restartTasksWithDelay(final 
FailureHandlingResult failureHandlingRe
 
 final CompletableFuture cancelFuture = 
cancelTasksAsync(verticesToRestart);
 
-final FailureHandlingResultSnapshot failureHandlingResultSnapshot =
-createFailureHandlingResultSnapshot(failureHandlingResult);
+archiveFromFailureHandlingResult(
+createFailureHandlingResultSnapshot(failureHandlingResult));
 delayExecutor.schedule(
 () ->
 FutureUtils.assertNoException(
 cancelFuture.thenRunAsync(
-() -> {
-archiveFromFailureHandlingResult(

Review Comment:
   Thanks for the clarification!
   
   The initial motivation might for collecting all concurrent exceptions. In 
this PR, the solution is save the `latestRootExceptionEntry` as a filed in 
`SchedulerBase`, when all subsequent non-root exceptions will be added to the 
`latestRootExceptionEntry`.



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java:
##
@@ -140,15 +142,20 @@ private static RootExceptionHistoryEntry 
createRootExceptionHistoryEntry(
 failureLabels,
 failingTaskName,
 taskManagerLocation,
-StreamSupport.stream(executions.spliterator(), false)
-.filter(execution -> 
execution.getFailureInfo().isPresent())
-.map(
-execution ->
-ExceptionHistoryEntry.create(
-execution,
-
execution.getVertexWithAttempt(),
-
FailureEnricherUtils.EMPTY_FAILURE_LABELS))
-.collect(Collectors.toList()));
+createExceptionHistoryEntries(executions));
+}
+
+public static List createExceptionHistoryEntries(

Review Comment:
   > we could move the logic into addConcurrentExceptions and call 
addConcurrentExceptions within createRootExceptionHistoryEntry on the newly 
created instance. 
   
   It makes sense to me, thanks~



##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureHandlingResult.java:
##
@@ -206,6 +214,11 @@ public boolean isGlobalFailure() {
 return globalFailure;
 }
 
+/** @return Whether this failure is a new attempt. */
+public boolean isNewAttempt() {

Review Comment:
   I added some comments to explain it. What do you think?



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java:
##
@@ -96,7 +98,7 @@ public static RootExceptionHistoryEntry fromGlobalFailure(
 }
 
 public static RootExceptionHistoryEntry fromExceptionHistoryEntry(
-ExceptionHistoryEntry entry, Iterable 
entries) {
+ExceptionHistoryEntry entry, List entries) {

Review Comment:
   Change it to `Collection`.
   
   We need to merge more exeptions into the `concurrentExceptions`, and 
`Iterable` doesn't support change. So changing it to `Collection`.



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java:
##
@@ -703,27 +707,41 @@ private void archiveGlobalFailure(
 long timestamp,
 CompletableFuture> failureLabels,
 Iterable executions) {
-exceptionHistory.add(
+latestRootExceptionEntry =
 RootExceptionHistoryEntry.fromGlobalFailure(
-failure, timestamp, failureLabels, executions));
+failure, timestamp, failureLabels, executions);
+exceptionHistory.add(latestRootExceptionEntry);
 log.debug("Archive global failure.", failure);
 }
 
 protected final void archiveFromFailureHandlingResult(
 FailureHandlingResultSnapshot failureHandlingResult) {
+// ALl exceptions as the ConcurrentExceptions when it's not a new 
attempt.
+if (!failureHandlingResult.isNewAttempt()) {
+checkState(latestRootExceptionEntry != null, "It should have old 
failure.");
+List concurrentlyExecutions = new LinkedList<>();
+
failureHandlingResult.getRootCauseExecution().ifPresent(concurrentlyExecutions::add);
+
concurrentlyExecutions.addAll(failureHandlingResult.getConcurrentlyFailedExecution());

Review Comment:
   Thanks for the detailed reveiw! 
   
   Good catch, I changed it to the `ArrayList`, 

[jira] [Assigned] (FLINK-33819) Support setting CompressType in RocksDBStateBackend

2024-01-10 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu reassigned FLINK-33819:


Assignee: Yue Ma

> Support setting CompressType in RocksDBStateBackend
> ---
>
> Key: FLINK-33819
> URL: https://issues.apache.org/jira/browse/FLINK-33819
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Yue Ma
>Assignee: Yue Ma
>Priority: Major
> Fix For: 1.19.0
>
> Attachments: image-2023-12-14-11-32-32-968.png, 
> image-2023-12-14-11-35-22-306.png
>
>
> Currently, RocksDBStateBackend does not support setting the compression 
> level, and Snappy is used for compression by default. But we have some 
> scenarios where compression will use a lot of CPU resources. Turning off 
> compression can significantly reduce CPU overhead. So we may need to support 
> a parameter for users to set the CompressType of Rocksdb.
>   !image-2023-12-14-11-35-22-306.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-33946) RocksDb sets setAvoidFlushDuringShutdown to true to speed up Task Cancel

2024-01-10 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu reassigned FLINK-33946:


Assignee: Yue Ma  (was: Hangxiang Yu)

> RocksDb sets setAvoidFlushDuringShutdown to true to speed up Task Cancel
> 
>
> Key: FLINK-33946
> URL: https://issues.apache.org/jira/browse/FLINK-33946
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.19.0
>Reporter: Yue Ma
>Assignee: Yue Ma
>Priority: Major
> Fix For: 1.19.0
>
>
> When a Job fails, the task needs to be canceled and re-deployed. 
> RocksDBStatebackend will call RocksDB.close when disposing.
> {code:java}
> if (!shutting_down_.load(std::memory_order_acquire) &&
> has_unpersisted_data_.load(std::memory_order_relaxed) &&
> !mutable_db_options_.avoid_flush_during_shutdown) {
>   if (immutable_db_options_.atomic_flush) {
> autovector cfds;
> SelectColumnFamiliesForAtomicFlush(&cfds);
> mutex_.Unlock();
> Status s =
> AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kShutDown);
> s.PermitUncheckedError();  //**TODO: What to do on error?
> mutex_.Lock();
>   } else {
> for (auto cfd : *versions_->GetColumnFamilySet()) {
>   if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) {
> cfd->Ref();
> mutex_.Unlock();
> Status s = FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown);
> s.PermitUncheckedError();  //**TODO: What to do on error?
> mutex_.Lock();
> cfd->UnrefAndTryDelete();
>   }
> }
>   } {code}
> By default (avoid_flush_during_shutdown=false) RocksDb requires FlushMemtable 
> when Close. When the disk pressure is high or the Memtable is large, this 
> process will be more time-consuming, which will cause the Task to get stuck 
> in the Canceling stage and affect the speed of job Failover.
> In fact, it is completely unnecessary to Flush memtable when Flink Task is 
> Close, because the data can be replayed from Checkpoint. So we can set 
> avoid_flush_during_shutdown to true to speed up Task Failover



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-33946) RocksDb sets setAvoidFlushDuringShutdown to true to speed up Task Cancel

2024-01-10 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu reassigned FLINK-33946:


Assignee: Hangxiang Yu

> RocksDb sets setAvoidFlushDuringShutdown to true to speed up Task Cancel
> 
>
> Key: FLINK-33946
> URL: https://issues.apache.org/jira/browse/FLINK-33946
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.19.0
>Reporter: Yue Ma
>Assignee: Hangxiang Yu
>Priority: Major
> Fix For: 1.19.0
>
>
> When a Job fails, the task needs to be canceled and re-deployed. 
> RocksDBStatebackend will call RocksDB.close when disposing.
> {code:java}
> if (!shutting_down_.load(std::memory_order_acquire) &&
> has_unpersisted_data_.load(std::memory_order_relaxed) &&
> !mutable_db_options_.avoid_flush_during_shutdown) {
>   if (immutable_db_options_.atomic_flush) {
> autovector cfds;
> SelectColumnFamiliesForAtomicFlush(&cfds);
> mutex_.Unlock();
> Status s =
> AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kShutDown);
> s.PermitUncheckedError();  //**TODO: What to do on error?
> mutex_.Lock();
>   } else {
> for (auto cfd : *versions_->GetColumnFamilySet()) {
>   if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) {
> cfd->Ref();
> mutex_.Unlock();
> Status s = FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown);
> s.PermitUncheckedError();  //**TODO: What to do on error?
> mutex_.Lock();
> cfd->UnrefAndTryDelete();
>   }
> }
>   } {code}
> By default (avoid_flush_during_shutdown=false) RocksDb requires FlushMemtable 
> when Close. When the disk pressure is high or the Memtable is large, this 
> process will be more time-consuming, which will cause the Task to get stuck 
> in the Canceling stage and affect the speed of job Failover.
> In fact, it is completely unnecessary to Flush memtable when Flink Task is 
> Close, because the data can be replayed from Checkpoint. So we can set 
> avoid_flush_during_shutdown to true to speed up Task Failover



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34015) Setting `execution.savepoint.ignore-unclaimed-state` does not take effect when passing this parameter by dynamic properties

2024-01-10 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805359#comment-17805359
 ] 

Hangxiang Yu commented on FLINK-34015:
--

[~zhourenxiang] Fine, it's vaild.

I have assigned it to you and commented in your pr, please take a look and go 
ahead.

> Setting `execution.savepoint.ignore-unclaimed-state` does not take effect 
> when passing this parameter by dynamic properties
> ---
>
> Key: FLINK-34015
> URL: https://issues.apache.org/jira/browse/FLINK-34015
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.17.0
>Reporter: Renxiang Zhou
>Assignee: Renxiang Zhou
>Priority: Critical
>  Labels: ignore-unclaimed-state-invalid, pull-request-available
> Attachments: image-2024-01-08-14-22-09-758.png, 
> image-2024-01-08-14-24-30-665.png
>
>
> We set `execution.savepoint.ignore-unclaimed-state` to true and use -D option 
> to submit the job, but unfortunately we found the value is still false in 
> jobmanager log.
> Pic 1: we  set `execution.savepoint.ignore-unclaimed-state` to true in 
> submiting job.
> !image-2024-01-08-14-22-09-758.png|width=1012,height=222!
> Pic 2: The value is still false in jmlog.
> !image-2024-01-08-14-24-30-665.png|width=651,height=51!
>  
> Besides, the parameter `execution.savepoint-restore-mode` has the same 
> problem since when we pass it by -D option.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34015) Setting `execution.savepoint.ignore-unclaimed-state` does not take effect when passing this parameter by dynamic properties

2024-01-10 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu reassigned FLINK-34015:


Assignee: Hangxiang Yu

> Setting `execution.savepoint.ignore-unclaimed-state` does not take effect 
> when passing this parameter by dynamic properties
> ---
>
> Key: FLINK-34015
> URL: https://issues.apache.org/jira/browse/FLINK-34015
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.17.0
>Reporter: Renxiang Zhou
>Assignee: Hangxiang Yu
>Priority: Critical
>  Labels: ignore-unclaimed-state-invalid, pull-request-available
> Attachments: image-2024-01-08-14-22-09-758.png, 
> image-2024-01-08-14-24-30-665.png
>
>
> We set `execution.savepoint.ignore-unclaimed-state` to true and use -D option 
> to submit the job, but unfortunately we found the value is still false in 
> jobmanager log.
> Pic 1: we  set `execution.savepoint.ignore-unclaimed-state` to true in 
> submiting job.
> !image-2024-01-08-14-22-09-758.png|width=1012,height=222!
> Pic 2: The value is still false in jmlog.
> !image-2024-01-08-14-24-30-665.png|width=651,height=51!
>  
> Besides, the parameter `execution.savepoint-restore-mode` has the same 
> problem since when we pass it by -D option.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34015) Setting `execution.savepoint.ignore-unclaimed-state` does not take effect when passing this parameter by dynamic properties

2024-01-10 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu reassigned FLINK-34015:


Assignee: Renxiang Zhou  (was: Hangxiang Yu)

> Setting `execution.savepoint.ignore-unclaimed-state` does not take effect 
> when passing this parameter by dynamic properties
> ---
>
> Key: FLINK-34015
> URL: https://issues.apache.org/jira/browse/FLINK-34015
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.17.0
>Reporter: Renxiang Zhou
>Assignee: Renxiang Zhou
>Priority: Critical
>  Labels: ignore-unclaimed-state-invalid, pull-request-available
> Attachments: image-2024-01-08-14-22-09-758.png, 
> image-2024-01-08-14-24-30-665.png
>
>
> We set `execution.savepoint.ignore-unclaimed-state` to true and use -D option 
> to submit the job, but unfortunately we found the value is still false in 
> jobmanager log.
> Pic 1: we  set `execution.savepoint.ignore-unclaimed-state` to true in 
> submiting job.
> !image-2024-01-08-14-22-09-758.png|width=1012,height=222!
> Pic 2: The value is still false in jmlog.
> !image-2024-01-08-14-24-30-665.png|width=651,height=51!
>  
> Besides, the parameter `execution.savepoint-restore-mode` has the same 
> problem since when we pass it by -D option.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34015][checkpoint] fix that passing ignore-unclaimed-state through dynamic props does not take effect [flink]

2024-01-10 Thread via GitHub


masteryhx commented on code in PR #24058:
URL: https://github.com/apache/flink/pull/24058#discussion_r1448208494


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java:
##
@@ -163,14 +163,16 @@ public static SavepointRestoreSettings forPath(
 public static void toConfiguration(
 final SavepointRestoreSettings savepointRestoreSettings,
 final Configuration configuration) {
-configuration.set(
-SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
-savepointRestoreSettings.allowNonRestoredState());
-configuration.set(
-SavepointConfigOptions.RESTORE_MODE, 
savepointRestoreSettings.getRestoreMode());
-final String savepointPath = savepointRestoreSettings.getRestorePath();
-if (savepointPath != null) {
-configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, 
savepointPath);
+if (!savepointRestoreSettings.equals(SavepointRestoreSettings.none())) 
{

Review Comment:
   I saw the equals of savepointRestoreSettings is not correct, so I created 
https://github.com/apache/flink/pull/24066 to fix this at first, welcome to 
review.



##
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java:
##
@@ -163,14 +163,16 @@ public static SavepointRestoreSettings forPath(
 public static void toConfiguration(
 final SavepointRestoreSettings savepointRestoreSettings,
 final Configuration configuration) {
-configuration.set(
-SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
-savepointRestoreSettings.allowNonRestoredState());
-configuration.set(
-SavepointConfigOptions.RESTORE_MODE, 
savepointRestoreSettings.getRestoreMode());
-final String savepointPath = savepointRestoreSettings.getRestorePath();
-if (savepointPath != null) {
-configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, 
savepointPath);
+if (!savepointRestoreSettings.equals(SavepointRestoreSettings.none())) 
{

Review Comment:
   I think the method contains the logic about setting savepointRestoreSettings 
to Configuration even for SavepointRestoreSettings.none.
   Some callers may just want to use SavepointRestoreSettings.none to override 
the configuration. e.g. RemoteStreamEnvironment#getEffectiveConfiguration (Of 
course, it seems not so correct about its logic).
   So How about letting the caller decide whether toConfiguration ? WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33946) RocksDb sets setAvoidFlushDuringShutdown to true to speed up Task Cancel

2024-01-10 Thread Yue Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805355#comment-17805355
 ] 

Yue Ma commented on FLINK-33946:


[~masteryhx]  thanks , I would like to take this and  I'll draft the pr soon

> RocksDb sets setAvoidFlushDuringShutdown to true to speed up Task Cancel
> 
>
> Key: FLINK-33946
> URL: https://issues.apache.org/jira/browse/FLINK-33946
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.19.0
>Reporter: Yue Ma
>Priority: Major
> Fix For: 1.19.0
>
>
> When a Job fails, the task needs to be canceled and re-deployed. 
> RocksDBStatebackend will call RocksDB.close when disposing.
> {code:java}
> if (!shutting_down_.load(std::memory_order_acquire) &&
> has_unpersisted_data_.load(std::memory_order_relaxed) &&
> !mutable_db_options_.avoid_flush_during_shutdown) {
>   if (immutable_db_options_.atomic_flush) {
> autovector cfds;
> SelectColumnFamiliesForAtomicFlush(&cfds);
> mutex_.Unlock();
> Status s =
> AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kShutDown);
> s.PermitUncheckedError();  //**TODO: What to do on error?
> mutex_.Lock();
>   } else {
> for (auto cfd : *versions_->GetColumnFamilySet()) {
>   if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) {
> cfd->Ref();
> mutex_.Unlock();
> Status s = FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown);
> s.PermitUncheckedError();  //**TODO: What to do on error?
> mutex_.Lock();
> cfd->UnrefAndTryDelete();
>   }
> }
>   } {code}
> By default (avoid_flush_during_shutdown=false) RocksDb requires FlushMemtable 
> when Close. When the disk pressure is high or the Memtable is large, this 
> process will be more time-consuming, which will cause the Task to get stuck 
> in the Canceling stage and affect the speed of job Failover.
> In fact, it is completely unnecessary to Flush memtable when Flink Task is 
> Close, because the data can be replayed from Checkpoint. So we can set 
> avoid_flush_during_shutdown to true to speed up Task Failover



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Test [flink]

2024-01-10 Thread via GitHub


WencongLiu closed pull request #23978: Test
URL: https://github.com/apache/flink/pull/23978


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33819) Support setting CompressType in RocksDBStateBackend

2024-01-10 Thread Yue Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805354#comment-17805354
 ] 

Yue Ma commented on FLINK-33819:


[~masteryhx]  thanks , I would like to take this and  I'll draft the pr soon

> Support setting CompressType in RocksDBStateBackend
> ---
>
> Key: FLINK-33819
> URL: https://issues.apache.org/jira/browse/FLINK-33819
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Yue Ma
>Priority: Major
> Fix For: 1.19.0
>
> Attachments: image-2023-12-14-11-32-32-968.png, 
> image-2023-12-14-11-35-22-306.png
>
>
> Currently, RocksDBStateBackend does not support setting the compression 
> level, and Snappy is used for compression by default. But we have some 
> scenarios where compression will use a lot of CPU resources. Turning off 
> compression can significantly reduce CPU overhead. So we may need to support 
> a parameter for users to set the CompressType of Rocksdb.
>   !image-2023-12-14-11-35-22-306.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34051][checkpoint] Fix equals/hashCode/toString for SavepointRestoreSettings [flink]

2024-01-10 Thread via GitHub


ljz2051 commented on code in PR #24066:
URL: https://github.com/apache/flink/pull/24066#discussion_r1448240936


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java:
##
@@ -105,12 +105,14 @@ public boolean equals(Object o) {
 
 SavepointRestoreSettings that = (SavepointRestoreSettings) o;
 return allowNonRestoredState == that.allowNonRestoredState
-&& (Objects.equals(restorePath, that.restorePath));
+&& Objects.equals(restorePath, that.restorePath)
+&& Objects.equals(restoreMode, that.restoreMode);
 }
 
 @Override
 public int hashCode() {
 int result = restorePath != null ? restorePath.hashCode() : 0;
+result = 31 * result + (restoreMode != null ? restoreMode.hashCode() : 
0);

Review Comment:
   The restoreMode property in SavepointRestoreSettings should be "@Nonnull" ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34051][checkpoint] Fix equals/hashCode/toString for SavepointRestoreSettings [flink]

2024-01-10 Thread via GitHub


ljz2051 commented on code in PR #24066:
URL: https://github.com/apache/flink/pull/24066#discussion_r1448240936


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java:
##
@@ -105,12 +105,14 @@ public boolean equals(Object o) {
 
 SavepointRestoreSettings that = (SavepointRestoreSettings) o;
 return allowNonRestoredState == that.allowNonRestoredState
-&& (Objects.equals(restorePath, that.restorePath));
+&& Objects.equals(restorePath, that.restorePath)
+&& Objects.equals(restoreMode, that.restoreMode);
 }
 
 @Override
 public int hashCode() {
 int result = restorePath != null ? restorePath.hashCode() : 0;
+result = 31 * result + (restoreMode != null ? restoreMode.hashCode() : 
0);

Review Comment:
   The restoreMode property in SavepointRestoreSettings should be \@Nonnull ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34051][checkpoint] Fix equals/hashCode/toString for SavepointRestoreSettings [flink]

2024-01-10 Thread via GitHub


ljz2051 commented on code in PR #24066:
URL: https://github.com/apache/flink/pull/24066#discussion_r1448240936


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java:
##
@@ -105,12 +105,14 @@ public boolean equals(Object o) {
 
 SavepointRestoreSettings that = (SavepointRestoreSettings) o;
 return allowNonRestoredState == that.allowNonRestoredState
-&& (Objects.equals(restorePath, that.restorePath));
+&& Objects.equals(restorePath, that.restorePath)
+&& Objects.equals(restoreMode, that.restoreMode);
 }
 
 @Override
 public int hashCode() {
 int result = restorePath != null ? restorePath.hashCode() : 0;
+result = 31 * result + (restoreMode != null ? restoreMode.hashCode() : 
0);

Review Comment:
   The restoreMode property in SavepointRestoreSettings should be @Nonnull ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33009) tools/release/update_japicmp_configuration.sh should only enable binary compatibility checks in the release branch

2024-01-10 Thread Wencong Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805352#comment-17805352
 ] 

Wencong Liu commented on FLINK-33009:
-

I've opened a pull request and CI has passed. 😄

> tools/release/update_japicmp_configuration.sh should only enable binary 
> compatibility checks in the release branch
> --
>
> Key: FLINK-33009
> URL: https://issues.apache.org/jira/browse/FLINK-33009
> Project: Flink
>  Issue Type: Bug
>  Components: Release System
>Affects Versions: 1.19.0
>Reporter: Matthias Pohl
>Assignee: Wencong Liu
>Priority: Major
>  Labels: pull-request-available
>
> According to [Flink's API compatibility 
> constraints|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/],
>  we only support binary compatibility between patch versions. In 
> [apache-flink:pom.xml:2246|https://github.com/apache/flink/blob/aa8d93ea239f5be79066b7e5caad08d966c86ab2/pom.xml#L2246]
>  we have binary compatibility enabled even in {{master}}. This doesn't comply 
> with the rules. We should this flag disabled in {{master}}. The 
> {{tools/release/update_japicmp_configuration.sh}} should enable this flag in 
> the release branch as part of the release process.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33905) FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs

2024-01-10 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song closed FLINK-33905.

Resolution: Done

master (1.19): 06b46a9cbf0d8fa987bbde570510f75a7558f54d

> FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs
> ---
>
> Key: FLINK-33905
> URL: https://issues.apache.org/jira/browse/FLINK-33905
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Affects Versions: 1.19.0
>Reporter: Wencong Liu
>Assignee: Wencong Liu
>Priority: Major
>  Labels: pull-request-available
>
> This ticket is proposed for 
> [FLIP-382|https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]

2024-01-10 Thread via GitHub


xintongsong closed pull request #23905: [FLINK-33905][core] Unify the Provision 
of Diverse Metadata for Context-like APIs
URL: https://github.com/apache/flink/pull/23905


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-34015) Setting `execution.savepoint.ignore-unclaimed-state` does not take effect when passing this parameter by dynamic properties

2024-01-10 Thread Renxiang Zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805349#comment-17805349
 ] 

Renxiang Zhou edited comment on FLINK-34015 at 1/11/24 2:36 AM:


[~masteryhx] Many thanks for your reply~
Yes, if we use the dynamic parameters to recover from a savepoint instead of 
using '-s' option, the CLI will generate the SavepointRestoreSettings.none() 
and set it to configuration.


was (Author: JIRAUSER295459):
[~masteryhx] Yes, if we use the dynamic parameters to recover from a savepoint 
instead of using '-s' option, the CLI will generate the 
SavepointRestoreSettings.none() and set it to configuration.

> Setting `execution.savepoint.ignore-unclaimed-state` does not take effect 
> when passing this parameter by dynamic properties
> ---
>
> Key: FLINK-34015
> URL: https://issues.apache.org/jira/browse/FLINK-34015
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.17.0
>Reporter: Renxiang Zhou
>Priority: Critical
>  Labels: ignore-unclaimed-state-invalid, pull-request-available
> Attachments: image-2024-01-08-14-22-09-758.png, 
> image-2024-01-08-14-24-30-665.png
>
>
> We set `execution.savepoint.ignore-unclaimed-state` to true and use -D option 
> to submit the job, but unfortunately we found the value is still false in 
> jobmanager log.
> Pic 1: we  set `execution.savepoint.ignore-unclaimed-state` to true in 
> submiting job.
> !image-2024-01-08-14-22-09-758.png|width=1012,height=222!
> Pic 2: The value is still false in jmlog.
> !image-2024-01-08-14-24-30-665.png|width=651,height=51!
>  
> Besides, the parameter `execution.savepoint-restore-mode` has the same 
> problem since when we pass it by -D option.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34015) Setting `execution.savepoint.ignore-unclaimed-state` does not take effect when passing this parameter by dynamic properties

2024-01-10 Thread Renxiang Zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805349#comment-17805349
 ] 

Renxiang Zhou commented on FLINK-34015:
---

[~masteryhx] Yes, if we use the dynamic parameters to recover from a savepoint 
instead of using '-s' option, the CLI will generate the 
SavepointRestoreSettings.none() and set it to configuration.

> Setting `execution.savepoint.ignore-unclaimed-state` does not take effect 
> when passing this parameter by dynamic properties
> ---
>
> Key: FLINK-34015
> URL: https://issues.apache.org/jira/browse/FLINK-34015
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.17.0
>Reporter: Renxiang Zhou
>Priority: Critical
>  Labels: ignore-unclaimed-state-invalid, pull-request-available
> Attachments: image-2024-01-08-14-22-09-758.png, 
> image-2024-01-08-14-24-30-665.png
>
>
> We set `execution.savepoint.ignore-unclaimed-state` to true and use -D option 
> to submit the job, but unfortunately we found the value is still false in 
> jobmanager log.
> Pic 1: we  set `execution.savepoint.ignore-unclaimed-state` to true in 
> submiting job.
> !image-2024-01-08-14-22-09-758.png|width=1012,height=222!
> Pic 2: The value is still false in jmlog.
> !image-2024-01-08-14-24-30-665.png|width=651,height=51!
>  
> Besides, the parameter `execution.savepoint-restore-mode` has the same 
> problem since when we pass it by -D option.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34015) Setting `execution.savepoint.ignore-unclaimed-state` does not take effect when passing this parameter by dynamic properties

2024-01-10 Thread Renxiang Zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805349#comment-17805349
 ] 

Renxiang Zhou edited comment on FLINK-34015 at 1/11/24 2:35 AM:


[~masteryhx] Yes, if we use the dynamic parameters to recover from a savepoint 
instead of using '-s' option, the CLI will generate the 
SavepointRestoreSettings.none() and set it to configuration.


was (Author: JIRAUSER295459):
[~masteryhx] Yes, if we use the dynamic parameters to recover from a savepoint 
instead of using '-s' option, the CLI will generate the 
SavepointRestoreSettings.none() and set it to configuration.

> Setting `execution.savepoint.ignore-unclaimed-state` does not take effect 
> when passing this parameter by dynamic properties
> ---
>
> Key: FLINK-34015
> URL: https://issues.apache.org/jira/browse/FLINK-34015
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.17.0
>Reporter: Renxiang Zhou
>Priority: Critical
>  Labels: ignore-unclaimed-state-invalid, pull-request-available
> Attachments: image-2024-01-08-14-22-09-758.png, 
> image-2024-01-08-14-24-30-665.png
>
>
> We set `execution.savepoint.ignore-unclaimed-state` to true and use -D option 
> to submit the job, but unfortunately we found the value is still false in 
> jobmanager log.
> Pic 1: we  set `execution.savepoint.ignore-unclaimed-state` to true in 
> submiting job.
> !image-2024-01-08-14-22-09-758.png|width=1012,height=222!
> Pic 2: The value is still false in jmlog.
> !image-2024-01-08-14-24-30-665.png|width=651,height=51!
>  
> Besides, the parameter `execution.savepoint-restore-mode` has the same 
> problem since when we pass it by -D option.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34058) Support optional parameters for named parameters

2024-01-10 Thread Feng Jin (Jira)
Feng Jin created FLINK-34058:


 Summary: Support optional parameters for named parameters
 Key: FLINK-34058
 URL: https://issues.apache.org/jira/browse/FLINK-34058
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Feng Jin
 Fix For: 1.19.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34055) Introduce a new annotation for named parameters

2024-01-10 Thread Feng Jin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feng Jin updated FLINK-34055:
-
Summary: Introduce a new annotation for named parameters  (was: Introduce a 
new annotation for named parameters.)

> Introduce a new annotation for named parameters
> ---
>
> Key: FLINK-34055
> URL: https://issues.apache.org/jira/browse/FLINK-34055
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Feng Jin
>Priority: Major
> Fix For: 1.19.0
>
>
> Introduce a new annotation to specify the parameter name, indicate if it is 
> optional, and potentially support specifying default values in the future.
> Deprecate the argumentNames method in FunctionHints as it is not 
> user-friendly for specifying argument names with optional configuration.
>  
> {code:java}
> public @interface ArgumentHint {
> /**
>  * The name of the parameter, default is an empty string.
>  */
> String name() default "";
>  
> /**
>  * Whether the parameter is optional, default is false.
>  */
> boolean isOptional() default false;
>  
> /**
>  * The data type hint for the parameter.
>  */
> DataTypeHint type() default @DataTypeHint();
> }
> {code}
> {code:java}
> public @interface FunctionHint {
>   
> /**
>  * Deprecated attribute for specifying the names of the arguments.
>  * It is no longer recommended to use this attribute.
>  */
> @Deprecated
> String[] argumentNames() default {""};
>   
> /**
>  * Attribute for specifying the hints and additional information for 
> function arguments.
>  */
> ArgumentHint[] arguments() default {};
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34057) Support named parameters for functions

2024-01-10 Thread Feng Jin (Jira)
Feng Jin created FLINK-34057:


 Summary: Support named parameters for functions
 Key: FLINK-34057
 URL: https://issues.apache.org/jira/browse/FLINK-34057
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Feng Jin
 Fix For: 1.19.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34056) Support named parameters for procedures

2024-01-10 Thread Feng Jin (Jira)
Feng Jin created FLINK-34056:


 Summary: Support named parameters for procedures
 Key: FLINK-34056
 URL: https://issues.apache.org/jira/browse/FLINK-34056
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Feng Jin
 Fix For: 1.19.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32852) [JUnit5 Migration] The scheduler package of flink-runtime module

2024-01-10 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-32852:

Fix Version/s: 1.19.0

> [JUnit5 Migration] The scheduler package of flink-runtime module
> 
>
> Key: FLINK-32852
> URL: https://issues.apache.org/jira/browse/FLINK-32852
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Rui Fan
>Assignee: RocMarshal
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32852) [JUnit5 Migration] The scheduler package of flink-runtime module

2024-01-10 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805348#comment-17805348
 ] 

Rui Fan commented on FLINK-32852:
-

Merged to master(1.19) via a6412b8497d1fdb3c0137a1651767db33836d966

> [JUnit5 Migration] The scheduler package of flink-runtime module
> 
>
> Key: FLINK-32852
> URL: https://issues.apache.org/jira/browse/FLINK-32852
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Rui Fan
>Assignee: RocMarshal
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34054) FLIP-387: Support named parameters for functions and procedures

2024-01-10 Thread Feng Jin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feng Jin updated FLINK-34054:
-
Summary: FLIP-387: Support named parameters for functions and procedures  
(was: FLIP-387: Support named parameters for functions and call procedures)

> FLIP-387: Support named parameters for functions and procedures
> ---
>
> Key: FLINK-34054
> URL: https://issues.apache.org/jira/browse/FLINK-34054
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Reporter: Feng Jin
>Priority: Major
> Fix For: 1.19.0
>
>
> Umbrella issue for 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-387%3A+Support+named+parameters+for+functions+and+call+procedures



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32852][JUnit5 migration] Migrate ExceptionHistoryEntryTest and RootExceptionHistoryEntryTest to Junit5 and Assertj [flink]

2024-01-10 Thread via GitHub


1996fanrui merged PR #24062:
URL: https://github.com/apache/flink/pull/24062


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32852][JUnit5 migration] Migrate ExceptionHistoryEntryTest and RootExceptionHistoryEntryTest to Junit5 and Assertj [flink]

2024-01-10 Thread via GitHub


1996fanrui commented on PR #24062:
URL: https://github.com/apache/flink/pull/24062#issuecomment-1886099946

   Thanks @RocMarshal for the review! Ci is green, merging~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-34055) Introduce a new annotation for named parameters.

2024-01-10 Thread Feng Jin (Jira)
Feng Jin created FLINK-34055:


 Summary: Introduce a new annotation for named parameters.
 Key: FLINK-34055
 URL: https://issues.apache.org/jira/browse/FLINK-34055
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Feng Jin
 Fix For: 1.19.0


Introduce a new annotation to specify the parameter name, indicate if it is 
optional, and potentially support specifying default values in the future.

Deprecate the argumentNames method in FunctionHints as it is not user-friendly 
for specifying argument names with optional configuration.

 
{code:java}
public @interface ArgumentHint {
/**
 * The name of the parameter, default is an empty string.
 */
String name() default "";
 
/**
 * Whether the parameter is optional, default is false.
 */
boolean isOptional() default false;
 
/**
 * The data type hint for the parameter.
 */
DataTypeHint type() default @DataTypeHint();
}
{code}



{code:java}
public @interface FunctionHint {
  
/**
 * Deprecated attribute for specifying the names of the arguments.
 * It is no longer recommended to use this attribute.
 */
@Deprecated
String[] argumentNames() default {""};
  
/**
 * Attribute for specifying the hints and additional information for 
function arguments.
 */
ArgumentHint[] arguments() default {};
}
{code}





--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33792] Generate the same code for the same logic [flink]

2024-01-10 Thread via GitHub


lsyldliu commented on code in PR #23984:
URL: https://github.com/apache/flink/pull/23984#discussion_r1448202301


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala:
##
@@ -87,10 +90,11 @@ object LongHashJoinGenerator {
   def genProjection(
   tableConfig: ReadableConfig,
   classLoader: ClassLoader,
-  types: Array[LogicalType]): GeneratedProjection = {
+  types: Array[LogicalType],
+  parentCtx: CodeGeneratorContext): GeneratedProjection = {

Review Comment:
   I just mean that we can get the `tableConfig` and `classLoader` from 
`parentCtx`, but this isn't an important point, ignore it now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-34054) FLIP-387: Support named parameters for functions and call procedures

2024-01-10 Thread Feng Jin (Jira)
Feng Jin created FLINK-34054:


 Summary: FLIP-387: Support named parameters for functions and call 
procedures
 Key: FLINK-34054
 URL: https://issues.apache.org/jira/browse/FLINK-34054
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Planner
Reporter: Feng Jin
 Fix For: 1.19.0


Umbrella issue for 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-387%3A+Support+named+parameters+for+functions+and+call+procedures



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix] Revert the breaking change to the public implementations of RichFunction [flink]

2024-01-10 Thread via GitHub


flinkbot commented on PR #24067:
URL: https://github.com/apache/flink/pull/24067#issuecomment-1886092566

   
   ## CI report:
   
   * 5d2c318f914b6cc790dabdf318b1788d0e0128a6 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-34053) Support state TTL hint for group aggregate

2024-01-10 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan reassigned FLINK-34053:
-

Assignee: xuyang

> Support state TTL hint for group aggregate
> --
>
> Key: FLINK-34053
> URL: https://issues.apache.org/jira/browse/FLINK-34053
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: Jane Chan
>Assignee: xuyang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30088) Excessive state updates for TtlMapState and TtlListState

2024-01-10 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu updated FLINK-30088:
-
Component/s: Runtime / State Backends

> Excessive state updates for TtlMapState and TtlListState
> 
>
> Key: FLINK-30088
> URL: https://issues.apache.org/jira/browse/FLINK-30088
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Roman Boyko
>Assignee: Roman Boyko
>Priority: Minor
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2022-11-18-20-25-14-466.png, 
> image-2022-11-18-20-27-24-054.png
>
>
> After merging the FLINK-21413 every ttl check for cleanup for TtlMapState and 
> TtlListState (even without expired elements) leads to whole state update.
> This is because:
> - comparison by link inside `TtlIncrementalCleanup`:
> !image-2022-11-18-20-25-14-466.png|width=450,height=288!
> - and creating new map or list inside TtlMapState or TtlListState:
> !image-2022-11-18-20-27-24-054.png|width=477,height=365!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [hotfix] Revert the breaking change to the public implementations of RichFunction [flink]

2024-01-10 Thread via GitHub


WencongLiu opened a new pull request, #24067:
URL: https://github.com/apache/flink/pull/24067

   ## What is the purpose of the change
   
   *Revert the breaking change to the public implementations of RichFunction. 
The original breaking change is applied by 
[FLINK-32978](https://issues.apache.org/jira/browse/FLINK-32978?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel).*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34053) Support state TTL hint for group aggregate

2024-01-10 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805346#comment-17805346
 ] 

xuyang commented on FLINK-34053:


Hi, [~qingyue] Can I take this jira?

> Support state TTL hint for group aggregate
> --
>
> Key: FLINK-34053
> URL: https://issues.apache.org/jira/browse/FLINK-34053
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: Jane Chan
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34052) Missing TopSpeedWindowing and SessionWindowing JARs in Flink Maven Repository

2024-01-10 Thread Junrui Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Junrui Li updated FLINK-34052:
--
Component/s: Examples

> Missing TopSpeedWindowing and SessionWindowing JARs in Flink Maven Repository
> -
>
> Key: FLINK-34052
> URL: https://issues.apache.org/jira/browse/FLINK-34052
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, Examples
>Affects Versions: 1.18.0
>Reporter: Junrui Li
>Priority: Major
>
> As a result of the changes implemented in FLINK-32821, the build process no 
> longer produces artifacts with the names 
> flink-examples-streaming-1.x-TopSpeedWindowing.jar and 
> flink-examples-streaming-1.x-SessionWindowing.jar. This has led to the 
> absence of these specific JAR files in the Maven repository 
> (https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming/1.18.0/).
> These artifacts were previously available and may still be expected by users 
> as part of their application dependencies. Their removal could potentially 
> break existing build pipelines and applications that depend on these example 
> JARs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34053) Support state TTL hint for group aggregate

2024-01-10 Thread Jane Chan (Jira)
Jane Chan created FLINK-34053:
-

 Summary: Support state TTL hint for group aggregate
 Key: FLINK-34053
 URL: https://issues.apache.org/jira/browse/FLINK-34053
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Affects Versions: 1.19.0
Reporter: Jane Chan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33583) Support state TTL hint for regular join

2024-01-10 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan resolved FLINK-33583.
---
Fix Version/s: 1.19.0
   Resolution: Fixed

> Support state TTL hint for regular join
> ---
>
> Key: FLINK-33583
> URL: https://issues.apache.org/jira/browse/FLINK-33583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API, Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: xuyang
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33583) Support state TTL hint for regular join

2024-01-10 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan closed FLINK-33583.
-

> Support state TTL hint for regular join
> ---
>
> Key: FLINK-33583
> URL: https://issues.apache.org/jira/browse/FLINK-33583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API, Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: xuyang
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33583) Support state TTL hint for regular join

2024-01-10 Thread Jane Chan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805345#comment-17805345
 ] 

Jane Chan commented on FLINK-33583:
---

Fixed in master 21403e31f4761bdddf5e4e802e0e5eb9b4533202

> Support state TTL hint for regular join
> ---
>
> Key: FLINK-33583
> URL: https://issues.apache.org/jira/browse/FLINK-33583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API, Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: xuyang
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33583][table-planner] Support state ttl hint for regular join [flink]

2024-01-10 Thread via GitHub


LadyForest merged PR #23752:
URL: https://github.com/apache/flink/pull/23752


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33879] Avoids the potential hang of Hybrid Shuffle during redistribution [flink]

2024-01-10 Thread via GitHub


reswqa commented on code in PR #23957:
URL: https://github.com/apache/flink/pull/23957#discussion_r1448184760


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java:
##
@@ -187,6 +187,7 @@ public void finish() throws IOException {
 
 @Override
 public void close() {
+storageMemoryManager.release();

Review Comment:
   I wonder how do we guarantee that `numRequestedBuffers.get() > 
bufferPool.getNumBuffers()` at this time, Otherwise, the buffer will be 
recycled to `bufferQueue` instead of `LocalBufferPool`.



##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionTest.java:
##
@@ -192,15 +190,38 @@ void testMetricsUpdateForBroadcastOnlyResultPartition() 
throws Exception {
 try (TieredResultPartition partition =
 createTieredStoreResultPartition(2, bufferPool, true)) {
 partition.broadcastRecord(ByteBuffer.allocate(bufferSize));
-IOMetrics ioMetrics = taskIOMetricGroup.createSnapshot();
-assertThat(ioMetrics.getResultPartitionBytes()).hasSize(1);
-ResultPartitionBytes partitionBytes =
-
ioMetrics.getResultPartitionBytes().values().iterator().next();
-assertThat(partitionBytes.getSubpartitionBytes())
-.containsExactly(bufferSize, bufferSize);
+verifySubpartitionBytes(bufferSize, bufferSize);
 }
 }
 
+@Test
+@Timeout(60)

Review Comment:
   Local timeout should be avoid according by 
https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#avoid-timeouts-in-junit-tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34051][checkpoint] Fix equals/hashCode/toString for SavepointRestoreSettings [flink]

2024-01-10 Thread via GitHub


flinkbot commented on PR #24066:
URL: https://github.com/apache/flink/pull/24066#issuecomment-1886063614

   
   ## CI report:
   
   * 96b1dc66449cb7782351ac379f78fdd0a29cb8ac UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34051) Fix equals/hashCode/toString for SavepointRestoreSettings

2024-01-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34051:
---
Labels: pull-request-available  (was: )

> Fix equals/hashCode/toString for SavepointRestoreSettings
> -
>
> Key: FLINK-34051
> URL: https://issues.apache.org/jira/browse/FLINK-34051
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.19.0
>Reporter: Hangxiang Yu
>Assignee: Hangxiang Yu
>Priority: Minor
>  Labels: pull-request-available
>
> SavepointRestoreSettings#equals/hashCode/toString missed restoreMode property



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-34051][checkpoint] Fix equals/hashCode/toString for SavepointRestoreSettings [flink]

2024-01-10 Thread via GitHub


masteryhx opened a new pull request, #24066:
URL: https://github.com/apache/flink/pull/24066

   
   
   ## What is the purpose of the change
   
   SavepointRestoreSettings#equals/hashCode/toString missed restoreMode property
   
   ## Brief change log
   
 - Add restoreMode property for SavepointRestoreSettings
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - Added SavepointRestoreSettingsTest
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32416] initial implementation of DynamicKafkaSource with bound… [flink-connector-kafka]

2024-01-10 Thread via GitHub


mas-chen commented on code in PR #44:
URL: 
https://github.com/apache/flink-connector-kafka/pull/44#discussion_r1448066046


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/DynamicKafkaSource.java:
##
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source;

Review Comment:
   Let me know if you have any other considerations. I'll squash the commits if 
we are in agreement (leaving the dangling commit now for your reviewing 
convenience)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Add Hive 3.0.0 connector [flink-web]

2024-01-10 Thread via GitHub


snuyanzin opened a new pull request, #709:
URL: https://github.com/apache/flink-web/pull/709

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33611] [flink-protobuf] Support Large Protobuf Schemas [flink]

2024-01-10 Thread via GitHub


sharath1709 commented on PR #23937:
URL: https://github.com/apache/flink/pull/23937#issuecomment-1885748350

   @libenchao Could you review this PR again, it is updated as per our 
discussion on the JIRA ticket and I also added a test case that fails without 
the changes in this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34007) Flink Job stuck in suspend state after losing leadership in HA Mode

2024-01-10 Thread Zhenqiu Huang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805294#comment-17805294
 ] 

Zhenqiu Huang commented on FLINK-34007:
---

[~mapohl]
Yes, I mistakenly looked into the flink 1.17 source code. I uploaded another 
debug log above. The KubernetesLeaderElector check the annotation 
"control-plane.alpha.kubernetes.io/leader" and whether the lockIdentity exists 
in content. Given this job only has 1 job manager, there should be no other job 
manager instance try to acquire the lock. The only possibility is that somehow 
the cluster config map is returned incorrectly.

In this case, even fabric8 LeaderElector will continue to try to acquire 
leadership (If it can get without exceed deadline), flink will not able to 
restart services (such RM and dispatcher) as DefaultLeaderRetrievalService is 
stopped also. To resolve the issue for now, should we focus on gracefully 
shutdown Job Manager rather than move job to Suspended status?  


> Flink Job stuck in suspend state after losing leadership in HA Mode
> ---
>
> Key: FLINK-34007
> URL: https://issues.apache.org/jira/browse/FLINK-34007
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.3, 1.17.2, 1.18.1, 1.18.2
>Reporter: Zhenqiu Huang
>Priority: Major
> Attachments: Debug.log, job-manager.log
>
>
> The observation is that Job manager goes to suspend state with a failed 
> container not able to register itself to resource manager after timeout.
> JM Log, see attached
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34023) Expose Kinesis client retry config in sink

2024-01-10 Thread Brad Atcheson (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805279#comment-17805279
 ] 

Brad Atcheson commented on FLINK-34023:
---

Yes. I'll look into it, post a proposed solution approach here, and wait for 
agreement before starting to implement.

> Expose Kinesis client retry config in sink
> --
>
> Key: FLINK-34023
> URL: https://issues.apache.org/jira/browse/FLINK-34023
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kinesis
>Reporter: Brad Atcheson
>Priority: Major
>
> The consumer side exposes client retry configuration like 
> [flink.shard.getrecords.maxretries|https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.html#SHARD_GETRECORDS_RETRIES]
>  but the producer side lacks similar config for PutRecords.
> The KinesisStreamsSinkWriter constructor calls 
> {code}
> this.httpClient = 
> AWSGeneralUtil.createAsyncHttpClient(kinesisClientProperties);
> this.kinesisClient = buildClient(kinesisClientProperties, this.httpClient);
> {code}
> But those methods only refer to these values (aside from 
> endpoint/region/creds) in the kinesisClientProperties:
> * aws.http-client.max-concurrency
> * aws.http-client.read-timeout
> * aws.trust.all.certificates
> * aws.http.protocol.version
> Without control over retry, users can observe exceptions like {code}Request 
> attempt 2 failure: Unable to execute HTTP request: connection timed out after 
> 2000 ms: kinesis.us-west-2.amazonaws.com{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34007) Flink Job stuck in suspend state after losing leadership in HA Mode

2024-01-10 Thread Zhenqiu Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenqiu Huang updated FLINK-34007:
--
Attachment: Debug.log

> Flink Job stuck in suspend state after losing leadership in HA Mode
> ---
>
> Key: FLINK-34007
> URL: https://issues.apache.org/jira/browse/FLINK-34007
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.3, 1.17.2, 1.18.1, 1.18.2
>Reporter: Zhenqiu Huang
>Priority: Major
> Attachments: Debug.log, job-manager.log
>
>
> The observation is that Job manager goes to suspend state with a failed 
> container not able to register itself to resource manager after timeout.
> JM Log, see attached
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-10 Thread via GitHub


davidradl commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1885629516

   > I haven't gone through all the code and comments yet, there are some 
inline comments below, however, two questions came into my mind while reviewing 
the PR:
   > 
   > * Why do you need to refactor `ParameterizedPredicate` and 
`JdbcFilterPushdownPreparedStatementVisitor`? Can you just adapt 
`JdbcRowDataLookupFunction` just like `JdbcRowDataInputFormat`?
   > * Have you considered `PrepareStatement` way to handle literals, as we 
have already discussed in the Jira? (I'm not sure about this, but looking at 
`JdbcRowDataLookupFunction`, it seems no place are handling this, so I assume 
that the implementation does not address that)
   
   I am new to this area. I was looking to work within the existing design with 
minimal changes (as the design was added ind a flip and works for scan queries. 
I think we have something that solves this critical issue - @libenchao would 
you be ok to proceed with this design and raise a subsequent issue / flip for a 
more elegant design. Or are you thinking this design is not appropriate and 
should not be merged? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-10 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1447891687


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java:
##
@@ -78,6 +79,8 @@ public class JdbcDynamicTableSource
 private List resolvedPredicates = new ArrayList<>();
 private Serializable[] pushdownParams = new Serializable[0];
 
+private List pushdownParameterizedPredicates = new 
ArrayList<>();

Review Comment:
   we tried that originally but the problem was that the template contains a ? 
and backticks which could be in the column name. so last week I changed the 
design to pass through the index of the placeholder, to remove ambiguity.   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32416] initial implementation of DynamicKafkaSource with bound… [flink-connector-kafka]

2024-01-10 Thread via GitHub


mas-chen commented on code in PR #44:
URL: 
https://github.com/apache/flink-connector-kafka/pull/44#discussion_r1447804230


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/DynamicKafkaSource.java:
##
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source;

Review Comment:
   I actually prefer `dynamic.source` since it makes sense for future usecases 
to leverage the dynamic code. e.g. a `dynamic.sink`. With that being said, I'm 
going to move the metadata up one package since that isn't source specific. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33971]Specifies whether to use HBase table that supports dynamic columns. [flink-connector-hbase]

2024-01-10 Thread via GitHub


ferenc-csaky commented on PR #36:
URL: 
https://github.com/apache/flink-connector-hbase/pull/36#issuecomment-1885437651

   > @ferenc-csaky Want to take a look?
   
   I'll review it in the next 2 days. Thanks for your contribution @MOBIN-F!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28915] Support fetching artifacts in native K8s and standalone application mode [flink]

2024-01-10 Thread via GitHub


flinkbot commented on PR #24065:
URL: https://github.com/apache/flink/pull/24065#issuecomment-1885431512

   
   ## CI report:
   
   * b779d13403028c290b803501a66f8c212a92ad8b UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-28915] Support fetching artifacts in native K8s and standalone application mode [flink]

2024-01-10 Thread via GitHub


ferenc-csaky opened a new pull request, #24065:
URL: https://github.com/apache/flink/pull/24065

   ## What is the purpose of the change
   
   Support fetching remote job JAR and additional artifacts (UDFs, formats, 
dependencies, etc.) in nativa Kubernetes and standalone application mode. The 
current change contains fetchers for DFS (via Flink FS abstraction) and HTTP.
   
   ## Brief change log
   
   * In standalone app mode, a `--jars` option is added, which has unlimited 
args, which will be fetched before the Flink cluster start. Example:
   ```sh
   ./bin/standalone-job.sh start-foreground \
 --jars http://localhost:/flink-sandbox.jar 
http://localhost:/test-udf.jar \
 --job-classname org.apache.flink.DummyJob
   ```
   * In native K8s app mode, the user can define additional artifacts via the 
`user.artifacts.artifact-list` property. Example:
   ```sh
   ./bin/flink run-application \
 --target kubernetes-application \
 -Dkubernetes.cluster-id=flink-cluster \
 -Dkubernetes.container.image.ref=flink \
 
-Duser.artifacts.artifact-list=http://host.minikube.internal:/test-udf.jar \
 http://host.minikube.internal:/flink-sandbox.jar
   ```
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   * Added tests for artifact fetching utils.
   * Added tests for artifact fetching logic.
   * Added tests to cover the changes in `DefaultPackagedProgramRetriever`.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: Artifact deployment in 
native Kubernetes app mode.
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31481][table] Support enhanced show databases syntax [flink]

2024-01-10 Thread via GitHub


jeyhunkarimov commented on code in PR #23612:
URL: https://github.com/apache/flink/pull/23612#discussion_r1447763299


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlShowDatabasesConverter.java:
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.dql.SqlShowDatabases;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ShowDatabasesOperation;
+
+/** A converter for {@link SqlShowDatabases}. */
+public class SqlShowDatabasesConverter implements 
SqlNodeConverter {
+
+@Override
+public Operation convertSqlNode(SqlShowDatabases sqlShowDatabases, 
ConvertContext context) {
+if (sqlShowDatabases.getPreposition() == null) {
+return new ShowDatabasesOperation(
+sqlShowDatabases.getLikeType(),
+sqlShowDatabases.getLikeSqlPattern(),
+sqlShowDatabases.isNotLike());
+} else {
+CatalogManager catalogManager = context.getCatalogManager();
+String[] fullCatalogName = sqlShowDatabases.getCatalog();
+String catalogName =
+fullCatalogName.length == 0
+? catalogManager.getCurrentCatalog()
+: fullCatalogName[0];

Review Comment:
   Sorry for the confusion. 
   No, there is no case that `preposition` is `not null` and `catalog` is 
`null`. 
   In fact, based on our discussion, I have already removed `preposition` field 
from `ShowDatabasesOperation`.
   
   `catalog` is null (so the `preposition` that doesn't exist now) only when 
there is no `FROM/IN` clause. 
   
   Do the changes in `ShowDatabasesOperation` reflect your comments or did I 
miss something?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33969] Implement restore tests for TableSourceScan node [flink]

2024-01-10 Thread via GitHub


bvarghese1 commented on code in PR #24020:
URL: https://github.com/apache/flink/pull/24020#discussion_r1447722550


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceScanTestPrograms.java:
##
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.table.utils.DateTimeUtils;
+import org.apache.flink.types.Row;
+
+/** {@link TableTestProgram} definitions for testing {@link 
StreamExecTableSourceScan}. */
+public class TableSourceScanTestPrograms {
+
+static final Row[] BEFORE_DATA = {
+Row.of(1, 1L, "hi", DateTimeUtils.toLocalDateTime(1586937601000L)),
+Row.of(2, 2L, "hello", DateTimeUtils.toLocalDateTime(1586937602000L)),
+Row.of(3, 2L, "hello world", 
DateTimeUtils.toLocalDateTime(1586937603000L))
+};
+
+static final Row[] AFTER_DATA = {
+Row.of(4, 4L, "foo", DateTimeUtils.toLocalDateTime(1586937614000L)),
+Row.of(5, 2L, "foo bar", 
DateTimeUtils.toLocalDateTime(1586937615000L)),
+};
+
+static final TableTestProgram PROJECT_PUSHDOWN =
+TableTestProgram.of(
+"table-source-scan-project-pushdown",
+"validates table source scan with project 
pushdown")
+.setupTableSource(
+SourceTestStep.newBuilder("source_t")
+.addSchema("a INT", "b BIGINT", "c 
VARCHAR")
+.producedBeforeRestore(BEFORE_DATA)
+.producedAfterRestore(AFTER_DATA)
+.build())
+.setupTableSink(
+SinkTestStep.newBuilder("sink_t")
+.addSchema("a INT", "b VARCHAR")
+.consumedBeforeRestore(
+"+I[1, hi]", "+I[2, hello]", 
"+I[3, hello world]")
+.consumedAfterRestore("+I[4, foo]", "+I[5, 
foo bar]")
+.build())
+.runSql("INSERT INTO sink_t SELECT a, c FROM source_t")
+.build();
+
+static final TableTestProgram PROJECT_PUSHDOWN_DISABLED =
+TableTestProgram.of(
+"table-source-scan-project-push-down-disabled",
+"validates table source scan with project pushdown 
disabled")
+.setupTableSource(
+SourceTestStep.newBuilder("source_t")
+.addSchema(
+"a INT",
+"b BIGINT",
+"c VARCHAR",
+"ts TIMESTAMP(3) METADATA")
+.addOption("readable-metadata", 
"ts:TIMESTAMP(3)")
+.addOption("enable-projection-push-down", 
"false")
+.producedBeforeRestore(BEFORE_DATA)
+.producedAfterRestore(AFTER_DATA)
+.build())
+.setupTableSink(
+SinkTestStep.newBuilder("sink_t")
+.addSchema("a INT", "c VARCHAR")
+.consumedBeforeRestore(
+"+I[1, hi]", "+I[2, hello]", 
"+I[3, hello world]")
+.consumedAfterRestore("+I[4, foo]", "+I[5, 
foo bar]")
+.build())
+.runSql("INSERT INTO sink_t SELECT a, c FROM source_t")
+.build();
+
+static final TableTestProgram FILTER_PUSHDOWN =
+TableTestProgram.of(
+

Re: [PR] [FLINK-33969] Implement restore tests for TableSourceScan node [flink]

2024-01-10 Thread via GitHub


bvarghese1 commented on code in PR #24020:
URL: https://github.com/apache/flink/pull/24020#discussion_r1447721761


##
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java:
##
@@ -198,6 +198,18 @@ public SqlTestStep getRunSqlTestStep() {
 return (SqlTestStep) sqlSteps.get(0);
 }
 
+/**
+ * Convenience method to avoid boilerplate code. It assumes only one 
statement set is tested.
+ */
+public StatementSetTestStep getRunStatementSetTestStep() {
+List statementSetSteps =
+runSteps.stream()
+.filter(s -> s.getKind() == TestKind.STATEMENT_SET)
+.collect(Collectors.toList());
+
+return (StatementSetTestStep) statementSetSteps.get(0);

Review Comment:
   Added a precondition check to verify similar to `getRunSqlTestStep`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-01-10 Thread via GitHub


hlteoh37 commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1447650514


##
prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkWriter.java:
##
@@ -0,0 +1,267 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseClassifier;
+import org.apache.flink.connector.prometheus.sink.prometheus.Remote;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
+import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.io.CloseMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xerial.snappy.Snappy;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+
+import static 
org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_SAMPLES_DROPPED;
+import static 
org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_SAMPLES_NON_RETRIABLE_DROPPED;
+import static 
org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_SAMPLES_OUT;
+import static 
org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_SAMPLES_RETRY_LIMIT_DROPPED;
+import static 
org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_WRITE_REQUESTS_OUT;
+import static 
org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_WRITE_REQUESTS_PERMANENTLY_FAILED;
+
+/** Writer, taking care of batching the {@link PrometheusTimeSeries} and 
handling retries. */
+public class PrometheusSinkWriter extends 
AsyncSinkWriter {
+
+/**
+ * * Batching of this sink is in terms of Samples, not bytes. The goal is 
adaptively increase
+ * the number of Samples in each batch, a WriteRequest sent to Prometheus, 
to a configurable
+ * number. This is the parameter maxBatchSizeInBytes.
+ *
+ * getSizeInBytes(requestEntry) returns the number of Samples (not 
bytes) and
+ * maxBatchSizeInBytes is actually in terms of Samples (not bytes).
+ *
+ * In AsyncSinkWriter, maxBatchSize is in terms of requestEntries 
(TimeSeries). But because
+ * each TimeSeries contains 1+ Samples, we set maxBatchSize = 
maxBatchSizeInBytes.
+ *
+ * maxRecordSizeInBytes is also calculated in the same unit assumed by 
getSizeInBytes(..). In
+ * our case is the max number of Samples in a single TimeSeries sent to 
the Sink. We are
+ * limiting the number of Samples in each TimeSeries to the max batch 
size, setting
+ * maxRecordSizeInBytes = maxBatchSizeInBytes.
+ */
+private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusSinkWriter.class);
+
+private final SinkCounters counters;
+private final CloseableHttpAsyncClient asyncHttpClient;
+private final PrometheusRemoteWriteHttpRequestBuilder requestBuilder;
+
+public PrometheusSinkWriter(
+ElementConverter 
elementConverter,
+Sink.InitContext context,
+int maxInFlightRequests,
+int maxBufferedRequests,
+int maxBatchSizeInSamples,
+long maxTimeInBufferMS,
+String prometheusRemoteWriteUrl,
+CloseableHttpAsyncClient asyncHttpClient,
+SinkCounters counters,
+PrometheusRequestSigner requestSigner) {
+this(
+elementConverter,
+context,
+maxInFlightRequests,
+maxBufferedRequests,
+maxBatchSi

Re: [PR] [FLINK-33914][ci] Adds basic Flink CI workflow [flink]

2024-01-10 Thread via GitHub


XComp commented on PR #23970:
URL: https://github.com/apache/flink/pull/23970#issuecomment-1885215058

   I triggered another workflow with lower timeouts: 
https://github.com/XComp/flink/actions/runs/7478072394 ...to double-check that 
running into time outs works as expected.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-01-10 Thread via GitHub


hlteoh37 commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1447607637


##
amp-request-signer/pom.xml:
##
@@ -0,0 +1,63 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+4.0.0
+
+org.apache.flink
+flink-prometheus
+1.0.0-SNAPSHOT
+
+
+Flink : Connectors : Prometheus : AMP request signer
+org.apache.flink.connector.prometheus
+amp-request-signer
+
+
+UTF-8
+11
+${target.java.version}
+${target.java.version}
+
+1.16.0

Review Comment:
   Should we consider 1.17? Since the supported version is 1.17 + 1.18 now



##
amp-request-signer/pom.xml:
##
@@ -0,0 +1,63 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+4.0.0
+
+org.apache.flink
+flink-prometheus
+1.0.0-SNAPSHOT
+
+
+Flink : Connectors : Prometheus : AMP request signer
+org.apache.flink.connector.prometheus
+amp-request-signer
+
+
+UTF-8
+11
+${target.java.version}
+${target.java.version}
+
+1.16.0
+
+
+
+
+org.apache.flink
+flink-connector-prometheus
+${project.version}
+

Review Comment:
   IIUC, this should be `provided`!



##
pom.xml:
##
@@ -0,0 +1,82 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+4.0.0
+
+org.apache.flink
+flink-prometheus

Review Comment:
   Should we keep the same convention as other connector repos 
`flink-connector-prometheus-parent`?



##
amp-request-signer/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSigner.java:
##
@@ -0,0 +1,94 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink.aws;
+
+import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner;
+import org.apache.flink.util.Preconditions;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSSessionCredentials;
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.util.BinaryUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Map;
+
+/** Sing a Remote-Write request to Amazon Managed Service for Prometheus 
(AMP). */
+public class AmazonManagedPrometheusWriteRequestSigner implements 
PrometheusRequestSigner {
+
+private final URL remoteWriteUrl;
+private final String awsRegion;
+
+/**
+ * Constructor.
+ *
+ * @param remoteWriteUrl URL of the remote-write endpoint
+ * @param awsRegion Region of the AMP workspace
+ */
+public AmazonManagedPrometheusWriteRequestSigner(String remoteWriteUrl, 
String awsRegion) {
+Preconditions.checkArgument(StringUtils.isNotBlank(awsRegion));

Review Comment:
   Should we add a message here in the event of failure?



##
amp-request-signer/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AWS4SignerBase.java:
##
@@ -0,0 +1,290 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, sof

[jira] [Commented] (FLINK-34007) Flink Job stuck in suspend state after losing leadership in HA Mode

2024-01-10 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805217#comment-17805217
 ] 

Matthias Pohl commented on FLINK-34007:
---

The fix you shared ended up in 6.0.0. That would mean that we shouldn't 
experience the issue in a Flink 1.18 cluster. The fabric8 client was updated to 
6.6.2 in 1.18.0 (FLINK-31997).

> Flink Job stuck in suspend state after losing leadership in HA Mode
> ---
>
> Key: FLINK-34007
> URL: https://issues.apache.org/jira/browse/FLINK-34007
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.3, 1.17.2, 1.18.1, 1.18.2
>Reporter: Zhenqiu Huang
>Priority: Major
> Attachments: job-manager.log
>
>
> The observation is that Job manager goes to suspend state with a failed 
> container not able to register itself to resource manager after timeout.
> JM Log, see attached
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >