Re: [PR] [FLINK-33929][jdbc-connector] Support JDBC String field read Fragment read [flink-connector-jdbc]
MartijnVisser commented on PR #87: URL: https://github.com/apache/flink-connector-jdbc/pull/87#issuecomment-1886531861 @zhilinli123 I still don't understand the Jira ticket, so I can't really review it. Perhaps @snuyanzin or @eskabetxe understand the goal of this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34040) ScalaSerializersMigrationTest.testStableAnonymousClassnameGeneration fails in GHA with JDK 17 and 21
[ https://issues.apache.org/jira/browse/FLINK-34040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805414#comment-17805414 ] Matthias Pohl commented on FLINK-34040: --- Yeah no. [~snuyanzin] also already gave the hint that it might be related with the Scala version that's used in the GitHub workflow profiles. I don't do anything related to changing the Scala version. I have to double-check how we do it in Azure and apply the same for GHA. > ScalaSerializersMigrationTest.testStableAnonymousClassnameGeneration fails in > GHA with JDK 17 and 21 > > > Key: FLINK-34040 > URL: https://issues.apache.org/jira/browse/FLINK-34040 > Project: Flink > Issue Type: Sub-task > Components: API / Scala, Build System / CI >Affects Versions: 1.19.0 >Reporter: Matthias Pohl >Priority: Major > Labels: github-actions, test-stability > > {code} > Error: 13:05:23 13:05:23.538 [ERROR] Tests run: 1, Failures: 1, Errors: 0, > Skipped: 0, Time elapsed: 0.375 s <<< FAILURE! -- in > org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest > Error: 13:05:23 13:05:23.538 [ERROR] > org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest.testStableAnonymousClassnameGeneration > -- Time elapsed: 0.371 s <<< FAILURE! > Jan 07 13:05:23 org.junit.ComparisonFailure: > expected:<...MigrationTest$$anon$[8]> but was:<...MigrationTest$$anon$[1]> > Jan 07 13:05:23 at org.junit.Assert.assertEquals(Assert.java:117) > Jan 07 13:05:23 at org.junit.Assert.assertEquals(Assert.java:146) > Jan 07 13:05:23 at > org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest.testStableAnonymousClassnameGeneration(ScalaSerializersMigrationTest.scala:60) > Jan 07 13:05:23 at > java.base/java.lang.reflect.Method.invoke(Method.java:580) > {code} > The error only happens in the [master GHA > nightly|https://github.com/XComp/flink/actions/workflows/nightly-dev.yml] for > JDK 17 and 21. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34007) Flink Job stuck in suspend state after losing leadership in HA Mode
[ https://issues.apache.org/jira/browse/FLINK-34007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805411#comment-17805411 ] Matthias Pohl edited comment on FLINK-34007 at 1/11/24 7:36 AM: I still don't fully understand the error you shared: Shouldn't the KubernetesClientException resolve itself because the logic runs in a loop? Is this stacktrace you shared only a one-time thing or does it reoccur (which would confirm the execution in the loop and indicate that the ConfigMap is in some odd state)? Another thing I'm wondering is why the ConfigMap was concurrently updated (which caused the KubernetesClientException as far as I understand) when there's only one JM running. Are there other processes accessing the ConfigMap? {quote} [...] flink will not able to restart services (such RM and dispatcher) as DefaultLeaderRetrievalService is stopped also [...] {quote} The DefaultLeaderRetrievalService is not in charge of restarting any services. The LeaderElectionService will trigger the restart of any shut down services (in our case the SessionDispatcherLeaderProcess which would be started by the DefaultDispatcherRunner; the latter one maintains the Dispatcher's leader election) as soon as the JobManager gets the leadership again. was (Author: mapohl): I still don't fully understand the error you shared: Shouldn't the KubernetesClientException resolve itself because the logic runs in a loop? Is this stacktrace you shared only a one-time thing or does it reoccur (which would confirm the execution in the loop and indicate that the ConfigMap is in some odd state)? Another thing I'm wondering is why the ConfigMap was concurrently updated (which caused the KubernetesClientException as far as I understand) when there's only one JM running. Are there other processes accessing the ConfigMap? {quote} [...] flink will not able to restart services (such RM and dispatcher) as DefaultLeaderRetrievalService is stopped also [...] {quote} The DefaultLeaderRetrievalService is not in charge of restarting any services. The LeaderElectionService will trigger the restart of any shut down services (in that case the SessionDispatcherLeaderProcess which would be started by the DefaultDispatcherRunner; the latter one maintains the Dispatcher's leader election) as soon as the JobManager gets the leadership again. > Flink Job stuck in suspend state after losing leadership in HA Mode > --- > > Key: FLINK-34007 > URL: https://issues.apache.org/jira/browse/FLINK-34007 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.3, 1.17.2, 1.18.1, 1.18.2 >Reporter: Zhenqiu Huang >Priority: Major > Attachments: Debug.log, job-manager.log > > > The observation is that Job manager goes to suspend state with a failed > container not able to register itself to resource manager after timeout. > JM Log, see attached > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34007) Flink Job stuck in suspend state after losing leadership in HA Mode
[ https://issues.apache.org/jira/browse/FLINK-34007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805411#comment-17805411 ] Matthias Pohl commented on FLINK-34007: --- I still don't fully understand the error you shared: Shouldn't the KubernetesClientException resolve itself because the logic runs in a loop? Is this stacktrace you shared only a one-time thing or does it reoccur (which would confirm the execution in the loop and indicate that the ConfigMap is in some odd state)? Another thing I'm wondering is why the ConfigMap was concurrently updated (which caused the KubernetesClientException as far as I understand) when there's only one JM running. Are there other processes accessing the ConfigMap? {quote} [...] flink will not able to restart services (such RM and dispatcher) as DefaultLeaderRetrievalService is stopped also [...] {quote} The DefaultLeaderRetrievalService is not in charge of restarting any services. The LeaderElectionService will trigger the restart of any shut down services (in that case the SessionDispatcherLeaderProcess which would be started by the DefaultDispatcherRunner; the latter one maintains the Dispatcher's leader election) as soon as the JobManager gets the leadership again. > Flink Job stuck in suspend state after losing leadership in HA Mode > --- > > Key: FLINK-34007 > URL: https://issues.apache.org/jira/browse/FLINK-34007 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.3, 1.17.2, 1.18.1, 1.18.2 >Reporter: Zhenqiu Huang >Priority: Major > Attachments: Debug.log, job-manager.log > > > The observation is that Job manager goes to suspend state with a failed > container not able to register itself to resource manager after timeout. > JM Log, see attached > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33879] Avoids the potential hang of Hybrid Shuffle during redistribution [flink]
reswqa commented on code in PR #23957: URL: https://github.com/apache/flink/pull/23957#discussion_r1448417073 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java: ## @@ -385,7 +395,7 @@ private boolean shouldReclaimBuffersBeforeRequesting(long delayForNextCheckMs) { /** Note that this method may be called by the netty thread. */ private void recycleBuffer(Object owner, MemorySegment buffer) { -if (numRequestedBuffers.get() <= bufferPool.getNumBuffers()) { +if (!isReleased.get() && numRequestedBuffers.get() <= bufferPool.getNumBuffers()) { Review Comment: If downstream task thread pass this check, but not got to the next line yet. At the same time, the upstream thread run and complete the `release`. Is it safe enough here 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34015][checkpoint] fix that passing ignore-unclaimed-state through dynamic props does not take effect [flink]
xiangforever2014 commented on code in PR #24058: URL: https://github.com/apache/flink/pull/24058#discussion_r1448416625 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java: ## @@ -163,14 +163,16 @@ public static SavepointRestoreSettings forPath( public static void toConfiguration( final SavepointRestoreSettings savepointRestoreSettings, final Configuration configuration) { -configuration.set( -SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, -savepointRestoreSettings.allowNonRestoredState()); -configuration.set( -SavepointConfigOptions.RESTORE_MODE, savepointRestoreSettings.getRestoreMode()); -final String savepointPath = savepointRestoreSettings.getRestorePath(); -if (savepointPath != null) { -configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, savepointPath); +if (!savepointRestoreSettings.equals(SavepointRestoreSettings.none())) { Review Comment: Yes I also notice this behavior, it seems a little bit weird, since it will only write the config of `SAVEPOINT_IGNORE_UNCLAIMED_STATE` and `RESTORE_MODE` in the method `toConfiguration` if we pass a SavepointRestoreSettings.none to it, the savepoint path will not write to the config. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33929][jdbc-connector] Support JDBC String field read Fragment read [flink-connector-jdbc]
zhilinli123 commented on PR #87: URL: https://github.com/apache/flink-connector-jdbc/pull/87#issuecomment-1886457713 PTAL: @MartijnVisser thanks ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33879] Avoids the potential hang of Hybrid Shuffle during redistribution [flink]
jiangxin369 commented on code in PR #23957: URL: https://github.com/apache/flink/pull/23957#discussion_r1448384184 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java: ## @@ -187,6 +187,7 @@ public void finish() throws IOException { @Override public void close() { +storageMemoryManager.release(); Review Comment: To make sure the buffers are not recycled to `bufferQueue`, I added a check of `isReleased` state. If the memory manager is released, all buffers would be recycled to the buffer pool. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33879] Avoids the potential hang of Hybrid Shuffle during redistribution [flink]
jiangxin369 commented on PR #23957: URL: https://github.com/apache/flink/pull/23957#issuecomment-1886429024 @reswqa Thanks for the review, I've updated the PR, please take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20281][table] support consuming cdc stream about window tvf aggregate [flink]
LadyForest commented on code in PR #24030: URL: https://github.com/apache/flink/pull/24030#discussion_r1448344823 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala: ## @@ -756,6 +756,147 @@ object TestData { row("2020-10-10 00:00:34", 1, 3d, 3f, new JBigDecimal("3.33"), "Comment#3", "b") ) + val windowChangelogDataWithTimestamp: Seq[Row] = List( Review Comment: ++-+---+-+-+---++---+ | Op | Timestamp | 1 | 2 | 3 | 4 | 5 | 6 | ++-+---+-+-+---++---+ | +I | 2020-10-10 00:00:01 | 1 | 1.0 | 1.0 | 1.11 | Hi | a | | +I | 2020-10-10 00:00:02 | 2 | 2.0 | 2.0 | 2.22 | Comment#1 | a | | -D | 2020-10-10 00:00:03 | 1 | 1.0 | 1.0 | 1.11 | Hi | a | | +I | 2020-10-10 00:00:03 | 2 | 2.0 | 2.0 | 2.22 | Comment#1 | a | | +I | 2020-10-10 00:00:04 | 5 | 5.0 | 5.0 | 5.55 |null| a | | -U | 2020-10-10 00:00:04 | 2 | 2.0 | 2.0 | 2.22 | Comment#1 | a | | +U | 2020-10-10 00:00:04 | 22|22.0 |22.2 | 22.22 | Comment#22 | a | | +I | 2020-10-10 00:00:07 | 3 | 3.0 | 3.0 | null | Hello| b | | +I | 2020-10-10 00:00:06 | 6 | 6.0 | 6.0 | 6.66 | Hi | b | | +I | 2020-10-10 00:00:08 | 3 |null | 3.0 | 3.33 | Comment#2 | a | | +I | 2020-10-10 00:00:04 | 5 | 5.0 |null | 5.55 | Hi | a | | +I | 2020-10-10 00:00:16 | 4 | 4.0 | 4.0 | 4.44 | Hi | b | | -D | 2020-10-10 00:00:04 | 5 | 5.0 | 5.0 | 5.55 |null| a | | +I | 2020-10-10 00:00:38 | 8 | 8.0 | 8.0 | 8.88 | Comment#4 | b | | -D | 2020-10-10 00:00:39 | 8 | 8.0 | 8.0 | 8.88 | Comment#4 | b | ++-+---+-+-+---++---+ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20281][table] support consuming cdc stream about window tvf aggregate [flink]
LadyForest commented on code in PR #24030: URL: https://github.com/apache/flink/pull/24030#discussion_r1448379812 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala: ## @@ -756,6 +756,147 @@ object TestData { row("2020-10-10 00:00:34", 1, 3d, 3f, new JBigDecimal("3.33"), "Comment#3", "b") ) + val windowChangelogDataWithTimestamp: Seq[Row] = List( Review Comment: ``` ++-+---+-+-+---++---+ | Op | Timestamp | 1 | 2 | 3 | 4 | 5 | 6 | ++-+---+-+-+---++---+ | +I | 2020-10-10 00:00:01 | 1 | 1.0 | 1.0 | 1.11 | Hi | a | | +I | 2020-10-10 00:00:02 | 2 | 2.0 | 2.0 | 2.22 | Comment#1 | a | | -D | 2020-10-10 00:00:03 | 1 | 1.0 | 1.0 | 1.11 | Hi | a | | +I | 2020-10-10 00:00:03 | 2 | 2.0 | 2.0 | 2.22 | Comment#1 | a | | +I | 2020-10-10 00:00:04 | 5 | 5.0 | 5.0 | 5.55 |null| a | | -U | 2020-10-10 00:00:04 | 2 | 2.0 | 2.0 | 2.22 | Comment#1 | a | | +U | 2020-10-10 00:00:04 | 22|22.0 |22.2 | 22.22 | Comment#22 | a | | +I | 2020-10-10 00:00:07 | 3 | 3.0 | 3.0 | null | Hello| b | | +I | 2020-10-10 00:00:06 | 6 | 6.0 | 6.0 | 6.66 | Hi | b | | +I | 2020-10-10 00:00:08 | 3 |null | 3.0 | 3.33 | Comment#2 | a | | +I | 2020-10-10 00:00:04 | 5 | 5.0 |null | 5.55 | Hi | a | | +I | 2020-10-10 00:00:16 | 4 | 4.0 | 4.0 | 4.44 | Hi | b | | -D | 2020-10-10 00:00:04 | 5 | 5.0 | 5.0 | 5.55 |null| a | | +I | 2020-10-10 00:00:38 | 8 | 8.0 | 8.0 | 8.88 | Comment#4 | b | | -D | 2020-10-10 00:00:39 | 8 | 8.0 | 8.0 | 8.88 | Comment#4 | b | ++-+---+-+-+---++---+ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33932) Support Retry Mechanism in RocksDBStateDataTransfer
[ https://issues.apache.org/jira/browse/FLINK-33932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805398#comment-17805398 ] xiangyu feng commented on FLINK-33932: -- [~masteryhx] Thx for ur reply, I've talked to [~dianer17] to update the description of this issue. Would you kindly assign this issue to me? Also, I would like to hear more from you about this issue in the discussion thread of FLIP-414: [https://lists.apache.org/thread/om4kgd6trx2lctwm6x92q2kdjngxtz9k] > Support Retry Mechanism in RocksDBStateDataTransfer > --- > > Key: FLINK-33932 > URL: https://issues.apache.org/jira/browse/FLINK-33932 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Reporter: Guojun Li >Priority: Major > Labels: pull-request-available > > Currently, there is no retry mechanism for downloading and uploading RocksDB > state files. Any jittering of remote filesystem might lead to a checkpoint > failure. By supporting retry mechanism in RocksDBStateDataTransfer, we can > significantly reduce the failure rate of checkpoint during asynchronous phase. > The exception is as below: > {noformat} > > 2023-12-19 08:46:00,197 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Decline > checkpoint 2 by task > 5b008c2c048fa8534d648455e69f9497_fbcce2f96483f8f15e87dc6c9afd372f_183_0 of > job a025f19e at > application-6f1c6e3d-1702480803995-5093022-taskmanager-1-1 @ > fdbd:dc61:1a:101:0:0:0:36 (dataPort=38789). > org.apache.flink.util.SerializedThrowable: > org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task > checkpoint failed. > at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > ~[?:?] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > ~[?:?] > at java.lang.Thread.run(Thread.java:829) [?:?] > Caused by: org.apache.flink.util.SerializedThrowable: java.lang.Exception: > Could not materialize checkpoint 2 for operator GlobalGroupAggregate[132] -> > Calc[133] (184/500)#0. > at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > ... 4 more > Caused by: org.apache.flink.util.SerializedThrowable: > java.util.concurrent.ExecutionException: java.io.IOException: Could not flush > to file and close the file system output stream to > hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the > stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?] > at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?] > at > org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:544) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > at > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > ... 3 more > Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: > Could not flush to file and close the file system output stream to > hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the > stream state handle > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:516) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > at > org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:157) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > at > org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:113) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > at > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.ja
Re: [PR] [FLINK-20281][table] support consuming cdc stream about window tvf aggregate [flink]
LadyForest commented on code in PR #24030: URL: https://github.com/apache/flink/pull/24030#discussion_r1446810962 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala: ## @@ -279,19 +279,19 @@ object AggregateUtil extends Enumeration { typeFactory: FlinkTypeFactory, inputRowType: RowType, aggCalls: Seq[AggregateCall], + needRetraction: Boolean, windowSpec: WindowSpec, isStateBackendDataViews: Boolean): AggregateInfoList = { -// Hopping window requires additional COUNT(*) to determine whether to register next timer -// through whether the current fired window is empty, see SliceSharedWindowAggProcessor. -val needInputCount = windowSpec.isInstanceOf[HoppingWindowSpec] +// Hopping window always requires additional COUNT(*) to determine whether to register next Review Comment: Nit: remove extra whitespace between "determine" and "whether" ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala: ## @@ -219,8 +219,13 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti val providedTrait = new ModifyKindSetTrait(builder.build()) createNewNode(window, children, providedTrait, requiredTrait, requester) - case _: StreamPhysicalWindowAggregate | _: StreamPhysicalWindowRank | - _: StreamPhysicalWindowDeduplicate => + case window: StreamPhysicalWindowAggregate => +// WindowAggregate and WindowTableAggregate support all changes in input +val children = visitChildren(window, ModifyKindSetTrait.ALL_CHANGES) +val providedTrait = new ModifyKindSetTrait(ModifyKindSet.INSERT_ONLY) Review Comment: Nit: Add a TODO to mark the provided trait set can be extended once we support emit strategy using the TVF syntax ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala: ## @@ -127,6 +162,26 @@ class WindowAggregateTest(aggPhaseEnforcer: AggregatePhaseStrategy) extends Tabl util.verifyRelPlan(sql) } + @TestTemplate + def testTumble_OnProctimeWithCDCSource(): Unit = { +assumeThat(isTwoPhase).isTrue Review Comment: Is there any particular reason to just enable two-phase test? ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java: ## @@ -231,4 +239,33 @@ protected void collect(RowData aggResult) { reuseOutput.replace(ctx.getKeyedStateBackend().getCurrentKey(), aggResult); ctx.output(reuseOutput); } + +/** A supplier that returns whether the window is empty. */ +protected final class WindowIsEmptySupplier implements Supplier, Serializable { +private static final long serialVersionUID = 1L; + +private final int indexOfCountStar; + +private WindowIsEmptySupplier(int indexOfCountStar, SliceAssigner assigner) { +if (assigner instanceof SliceAssigners.HoppingSliceAssigner) { +checkArgument( +indexOfCountStar >= 0, +"Hopping window requires a COUNT(*) in the aggregate functions."); +} +this.indexOfCountStar = indexOfCountStar; +} + +@Override +public Boolean get() { +if (indexOfCountStar < 0) { Review Comment: I saw there is a precondition check `indexOfCounter`. In which condition might it be negative? ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala: ## @@ -756,6 +756,147 @@ object TestData { row("2020-10-10 00:00:34", 1, 3d, 3f, new JBigDecimal("3.33"), "Comment#3", "b") ) + val windowChangelogDataWithTimestamp: Seq[Row] = List( Review Comment: ++-+---+-+-+---++---+ | Op | Timestamp | 1 | 2 | 3 | 4 | 5 | 6 | ++-+---+-+-+---++---+ | +I | 2020-10-10 00:00:01 | 1 | 1.0 | 1.0 | 1.11 | Hi | a | | +I | 2020-10-10 00:00:02 | 2 | 2.0 | 2.0 | 2.22 | Comment#1 | a | | -D | 2020-10-10 00:00:03 | 1 | 1.0 | 1.0 | 1.11 | Hi | a | | +I | 2020-10-10 00:00:03 | 2 | 2.0 | 2.0 | 2.22 | Comment#1 | a | | +I | 2020-10-10 00:00:04 | 5 | 5.0 | 5.0 | 5.55 |null| a | | -U | 2020-10-10 00:00:04 | 2 | 2.0 | 2.0 | 2.22 | Comment#1 | a | | +U | 2020-10-10 00:00:04 | 22|22.0 |22.2 | 22.22 | Comment#22 | a | | +I | 2020-10-10 00:00:07 | 3 | 3.0 | 3.0 | null | Hello| b | | +I | 2020-10-10 00:00:06 | 6 | 6.0 | 6.0 | 6.66 | Hi | b | | +I | 2020-10-10 00:00:08 | 3 |null | 3.0 | 3.33 | Comment#2 | a | | +I |
[jira] [Commented] (FLINK-34050) Rocksdb state has space amplification after rescaling with DeleteRange
[ https://issues.apache.org/jira/browse/FLINK-34050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805392#comment-17805392 ] Yue Ma commented on FLINK-34050: Hi [~lijinzhong] Thanks for reporting this issue. We have also encountered it before. I think this is a great suggestion. Overall, this is still a trade off of time and space If recovery time is the most important, then we can use deleteRange If we want to achieve good recovery time and space amplification, then we can use deleteRange+deleteFilesInRanges If space enlargement is very important, then we can consider deleteRange+deleteFilesInRanges+CompactRanges (Of course, perhaps we can see if there are other ways to change space reclamation to an asynchronous process) > Rocksdb state has space amplification after rescaling with DeleteRange > -- > > Key: FLINK-34050 > URL: https://issues.apache.org/jira/browse/FLINK-34050 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Jinzhong Li >Priority: Major > Attachments: image-2024-01-10-21-23-48-134.png, > image-2024-01-10-21-24-10-983.png, image-2024-01-10-21-28-24-312.png > > > FLINK-21321 use deleteRange to speed up rocksdb rescaling, however it will > cause space amplification in some case. > We can reproduce this problem using wordCount job: > 1) before rescaling, state operator in wordCount job has 2 parallelism and > 4G+ full checkpoint size; > !image-2024-01-10-21-24-10-983.png|width=266,height=130! > 2) then restart job with 4 parallelism (for state operator), the full > checkpoint size of new job will be 8G+ ; > 3) after many successful checkpoints, the full checkpoint size is still 8G+; > !image-2024-01-10-21-28-24-312.png|width=454,height=111! > > The root cause of this issue is that the deleted keyGroupRange does not > overlap with current DB keyGroupRange, so new data written into rocksdb after > rescaling almost never do LSM compaction with the deleted data (belonging to > other keyGroupRange.) > > And the space amplification may affect Rocksdb read performance and disk > space usage after rescaling. It looks like a regression due to the > introduction of deleteRange for rescaling optimization. > > To slove this problem, I think maybe we can invoke > Rocksdb.deleteFilesInRanges after deleteRange? > {code:java} > public static void clipDBWithKeyGroupRange() { > //... > List ranges = new ArrayList<>(); > //... > deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes); > ranges.add(beginKeyGroupBytes); > ranges.add(endKeyGroupBytes); > // > for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { > db.deleteFilesInRanges(columnFamilyHandle, ranges, false); > } > } > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33946] [rocksdb] set AvoidFlushDuringShutdown to true for avoiding Flush when Cancel Task [flink]
flinkbot commented on PR #24069: URL: https://github.com/apache/flink/pull/24069#issuecomment-1886378530 ## CI report: * 556acc2a010b25284766bb9eeed2e18f2a8c2eef UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33946) RocksDb sets setAvoidFlushDuringShutdown to true to speed up Task Cancel
[ https://issues.apache.org/jira/browse/FLINK-33946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33946: --- Labels: pull-request-available (was: ) > RocksDb sets setAvoidFlushDuringShutdown to true to speed up Task Cancel > > > Key: FLINK-33946 > URL: https://issues.apache.org/jira/browse/FLINK-33946 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.19.0 >Reporter: Yue Ma >Assignee: Yue Ma >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > When a Job fails, the task needs to be canceled and re-deployed. > RocksDBStatebackend will call RocksDB.close when disposing. > {code:java} > if (!shutting_down_.load(std::memory_order_acquire) && > has_unpersisted_data_.load(std::memory_order_relaxed) && > !mutable_db_options_.avoid_flush_during_shutdown) { > if (immutable_db_options_.atomic_flush) { > autovector cfds; > SelectColumnFamiliesForAtomicFlush(&cfds); > mutex_.Unlock(); > Status s = > AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kShutDown); > s.PermitUncheckedError(); //**TODO: What to do on error? > mutex_.Lock(); > } else { > for (auto cfd : *versions_->GetColumnFamilySet()) { > if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) { > cfd->Ref(); > mutex_.Unlock(); > Status s = FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown); > s.PermitUncheckedError(); //**TODO: What to do on error? > mutex_.Lock(); > cfd->UnrefAndTryDelete(); > } > } > } {code} > By default (avoid_flush_during_shutdown=false) RocksDb requires FlushMemtable > when Close. When the disk pressure is high or the Memtable is large, this > process will be more time-consuming, which will cause the Task to get stuck > in the Canceling stage and affect the speed of job Failover. > In fact, it is completely unnecessary to Flush memtable when Flink Task is > Close, because the data can be replayed from Checkpoint. So we can set > avoid_flush_during_shutdown to true to speed up Task Failover -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-33946] [rocksdb] set AvoidFlushDuringShutdown to true for avoiding Flush when Cancel Task [flink]
mayuehappy opened a new pull request, #24069: URL: https://github.com/apache/flink/pull/24069 set setAvoidFlushDuringShutdown to true for avoiding Flush when Cancel Task ## What is the purpose of the change set setAvoidFlushDuringShutdown to true for avoiding Flush when Cancel Task ## Brief change log set AvoidFlushDuringShutdown to true as default Rocksdb options ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework This change is already covered by existing tests This change added tests and can be verified as follows: ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): ( no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yno) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no ) - Anything that affects deployment or recovery: flink-statebackend-rocksdb - The S3 file system connector: (no ) ## Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34051][checkpoint] Fix equals/hashCode/toString for SavepointRestoreSettings [flink]
masteryhx commented on code in PR #24066: URL: https://github.com/apache/flink/pull/24066#discussion_r1448341514 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java: ## @@ -105,12 +105,14 @@ public boolean equals(Object o) { SavepointRestoreSettings that = (SavepointRestoreSettings) o; return allowNonRestoredState == that.allowNonRestoredState -&& (Objects.equals(restorePath, that.restorePath)); +&& Objects.equals(restorePath, that.restorePath) +&& Objects.equals(restoreMode, that.restoreMode); } @Override public int hashCode() { int result = restorePath != null ? restorePath.hashCode() : 0; +result = 31 * result + (restoreMode != null ? restoreMode.hashCode() : 0); Review Comment: Thanks for the suggestion, I think it's reasonable to mark it as '@ Nonnull' -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-33881) [TtlListState]Avoid copy and update value in TtlListState#getUnexpiredOrNull
[ https://issues.apache.org/jira/browse/FLINK-33881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hangxiang Yu resolved FLINK-33881. -- Resolution: Fixed Thanks [~lijinzhong] for the great work! merged 907d0f32 into master > [TtlListState]Avoid copy and update value in TtlListState#getUnexpiredOrNull > > > Key: FLINK-33881 > URL: https://issues.apache.org/jira/browse/FLINK-33881 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Jinzhong Li >Assignee: Jinzhong Li >Priority: Minor > Labels: pull-request-available > Attachments: image-2023-12-19-21-25-21-446.png, > image-2023-12-19-21-26-43-518.png > > > In some scenarios, 'TtlListState#getUnexpiredOrNull -> > elementSerializer.copy(ttlValue)' consumes a lot of cpu resources. > !image-2023-12-19-21-25-21-446.png|width=529,height=119! > I found that for TtlListState#getUnexpiredOrNull, if none of the elements > have expired, it still needs to copy all the elements and update the whole > list/map in TtlIncrementalCleanup#runCleanup(); > !image-2023-12-19-21-26-43-518.png|width=505,height=266! > I think we could optimize TtlListState#getUnexpiredOrNull by: > 1)find the first expired element index in the list; > 2)If not found, return to the original list; > 3)If found, then constrct the unexpire list (puts the previous elements into > the list), and go through the subsequent elements, adding expired elements > into the list. > {code:java} > public List> getUnexpiredOrNull(@Nonnull List> > ttlValues) { > //... > int firstExpireIndex = -1; > for (int i = 0; i < ttlValues.size(); i++) { > if (TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) { > firstExpireIndex = i; > break; > } > } > if (firstExpireIndex == -1) { > return ttlValues; //return the original ttlValues > } > List> unexpired = new ArrayList<>(ttlValues.size()); > for (int i = 0; i < ttlValues.size(); i++) { > if (i < firstExpireIndex) { > // unexpired.add(ttlValues.get(i)); > unexpired.add(elementSerializer.copy(ttlValues.get(i))); > } > if (i > firstExpireIndex) { > if (!TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) { > // unexpired.add(ttlValues.get(i)); > unexpired.add(elementSerializer.copy(ttlValues.get(i))); > } > } > } > // . > } {code} > *In this way, the extra iteration overhead is actually very very small, but > the benefit when there are no expired elements is significant.* -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33881]Avoid copy and update value in TtlListState#getUnexpiredOrNull [flink]
masteryhx closed pull request #24057: [FLINK-33881]Avoid copy and update value in TtlListState#getUnexpiredOrNull URL: https://github.com/apache/flink/pull/24057 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33881]Avoid copy and update value in TtlListState#getUnexpiredOrNull [flink]
masteryhx commented on PR #24057: URL: https://github.com/apache/flink/pull/24057#issuecomment-1886347612 merged. Just a reminder, remeber to write commit message with component tag next time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34049][table]Refactor classes related to window TVF aggregation to prepare for non-aligned windows [flink]
flinkbot commented on PR #24068: URL: https://github.com/apache/flink/pull/24068#issuecomment-1886331556 ## CI report: * fd074024357c148202e3ff3e218309480ba12569 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-34015) Setting `execution.savepoint.ignore-unclaimed-state` does not take effect when passing this parameter by dynamic properties
[ https://issues.apache.org/jira/browse/FLINK-34015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17804983#comment-17804983 ] Renxiang Zhou edited comment on FLINK-34015 at 1/11/24 5:48 AM: [~masteryhx] Hello, hangxiang, please have a look at this issue if you have time, many thanks~ And I have implemented a fix for this little bug. https://github.com/apache/flink/pull/24058 was (Author: JIRAUSER295459): [~masteryhx] Hello, hangxiang, please have a look at this issue if you have time, many thanks~ And I have implemented a fix for this little bug. [https://github.com/apache/flink/pull/24058|https://github.com/apache/flink/pull/24058,] > Setting `execution.savepoint.ignore-unclaimed-state` does not take effect > when passing this parameter by dynamic properties > --- > > Key: FLINK-34015 > URL: https://issues.apache.org/jira/browse/FLINK-34015 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.17.0 >Reporter: Renxiang Zhou >Assignee: Renxiang Zhou >Priority: Critical > Labels: ignore-unclaimed-state-invalid, pull-request-available > Attachments: image-2024-01-08-14-22-09-758.png, > image-2024-01-08-14-24-30-665.png > > > We set `execution.savepoint.ignore-unclaimed-state` to true and use -D option > to submit the job, but unfortunately we found the value is still false in > jobmanager log. > Pic 1: we set `execution.savepoint.ignore-unclaimed-state` to true in > submiting job. > !image-2024-01-08-14-22-09-758.png|width=1012,height=222! > Pic 2: The value is still false in jmlog. > !image-2024-01-08-14-24-30-665.png|width=651,height=51! > > Besides, the parameter `execution.savepoint-restore-mode` has the same > problem since when we pass it by -D option. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34015) Setting `execution.savepoint.ignore-unclaimed-state` does not take effect when passing this parameter by dynamic properties
[ https://issues.apache.org/jira/browse/FLINK-34015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805378#comment-17805378 ] Renxiang Zhou commented on FLINK-34015: --- [~masteryhx] Many thanks to you, I will check it~(y) > Setting `execution.savepoint.ignore-unclaimed-state` does not take effect > when passing this parameter by dynamic properties > --- > > Key: FLINK-34015 > URL: https://issues.apache.org/jira/browse/FLINK-34015 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.17.0 >Reporter: Renxiang Zhou >Assignee: Renxiang Zhou >Priority: Critical > Labels: ignore-unclaimed-state-invalid, pull-request-available > Attachments: image-2024-01-08-14-22-09-758.png, > image-2024-01-08-14-24-30-665.png > > > We set `execution.savepoint.ignore-unclaimed-state` to true and use -D option > to submit the job, but unfortunately we found the value is still false in > jobmanager log. > Pic 1: we set `execution.savepoint.ignore-unclaimed-state` to true in > submiting job. > !image-2024-01-08-14-22-09-758.png|width=1012,height=222! > Pic 2: The value is still false in jmlog. > !image-2024-01-08-14-24-30-665.png|width=651,height=51! > > Besides, the parameter `execution.savepoint-restore-mode` has the same > problem since when we pass it by -D option. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34049) Refactor classes related to window TVF aggregation to prepare for non-aligned windows
[ https://issues.apache.org/jira/browse/FLINK-34049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34049: --- Labels: pull-request-available (was: ) > Refactor classes related to window TVF aggregation to prepare for non-aligned > windows > - > > Key: FLINK-34049 > URL: https://issues.apache.org/jira/browse/FLINK-34049 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: xuyang >Assignee: xuyang >Priority: Major > Labels: pull-request-available > > Refactor classes related to window TVF aggregation such as > AbstractWindowAggProcessor to prepare for the implementation of non-aligned > windows like session window -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-34049][table]Refactor classes related to window TVF aggregation to prepare for non-aligned windows [flink]
xuyangzhong opened a new pull request, #24068: URL: https://github.com/apache/flink/pull/24068 ## What is the purpose of the change Currently, the implementation of the window aggregate op with new WINDOW TVF syntax is strongly bound to slicing windows such as TUMBLE, HOP and CUMULATE. This brings difficulties to the introduction of unslicing windows such as SESSION. This pr aims to refactor some classes about window aggregate op to do the preparation for the introduction of unslicing windows. ## Brief change log *(for example:)* - *add two package 'groupwindow' and 'windowtvf' under 'org.apache.flink.table.runtime.operators.window' to split the different implements about the legacy group window agg and the new window tvf agg* - *move the classes related into these two new packages* - *extract abstract class about slicing window processor and operator* ## Verifying this change Existent tests can cover this changes. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33325) FLIP-375: Built-in cross-platform powerful java profiler
[ https://issues.apache.org/jira/browse/FLINK-33325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805374#comment-17805374 ] Yu Chen commented on FLINK-33325: - Hi [~Zhanghao Chen] , thanks for your attention, the feature will be available to everyone soon! > FLIP-375: Built-in cross-platform powerful java profiler > > > Key: FLINK-33325 > URL: https://issues.apache.org/jira/browse/FLINK-33325 > Project: Flink > Issue Type: Improvement > Components: Runtime / REST, Runtime / Web Frontend >Affects Versions: 1.19.0 >Reporter: Yu Chen >Assignee: Yu Chen >Priority: Major > > This is an umbrella JIRA of > [FLIP-375|https://cwiki.apache.org/confluence/x/64lEE] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33337) Expose IngestDB and ClipDB in the official RocksDB API
[ https://issues.apache.org/jira/browse/FLINK-7?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yue Ma updated FLINK-7: --- Attachment: image-2024-01-11-12-03-14-308.png > Expose IngestDB and ClipDB in the official RocksDB API > -- > > Key: FLINK-7 > URL: https://issues.apache.org/jira/browse/FLINK-7 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Piotr Nowojski >Assignee: Yue Ma >Priority: Major > Attachments: image-2024-01-11-12-03-14-308.png > > > Remaining open PRs: > None :) > Already merged PRs: > https://github.com/facebook/rocksdb/pull/11646 > https://github.com/facebook/rocksdb/pull/11868 > https://github.com/facebook/rocksdb/pull/11811 > https://github.com/facebook/rocksdb/pull/11381 > https://github.com/facebook/rocksdb/pull/11379 > https://github.com/facebook/rocksdb/pull/11378 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33337) Expose IngestDB and ClipDB in the official RocksDB API
[ https://issues.apache.org/jira/browse/FLINK-7?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805364#comment-17805364 ] Yue Ma commented on FLINK-7: Update: https://github.com/facebook/rocksdb/pull/12219 This PR fixes a bug that may cause incorrect ClipDB results > Expose IngestDB and ClipDB in the official RocksDB API > -- > > Key: FLINK-7 > URL: https://issues.apache.org/jira/browse/FLINK-7 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Piotr Nowojski >Assignee: Yue Ma >Priority: Major > Attachments: image-2024-01-11-12-03-14-308.png > > > Remaining open PRs: > None :) > Already merged PRs: > https://github.com/facebook/rocksdb/pull/11646 > https://github.com/facebook/rocksdb/pull/11868 > https://github.com/facebook/rocksdb/pull/11811 > https://github.com/facebook/rocksdb/pull/11381 > https://github.com/facebook/rocksdb/pull/11379 > https://github.com/facebook/rocksdb/pull/11378 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34021) Print jobKey in the Autoscaler standalone log
[ https://issues.apache.org/jira/browse/FLINK-34021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805363#comment-17805363 ] Rui Fan commented on FLINK-34021: - Merged to main(1.8.0) via : f67cba9c8269ddd4e27fcdc40cfd8a293ae665de > Print jobKey in the Autoscaler standalone log > - > > Key: FLINK-34021 > URL: https://issues.apache.org/jira/browse/FLINK-34021 > Project: Flink > Issue Type: Sub-task > Components: Autoscaler >Affects Versions: kubernetes-operator-1.8.0 >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > FLINK-33814 has supported the multiple threads scaling for autoscaler > standalone. When a lot of jobs are scaling, autoscaler standalone will print > too many logs. Currently, each log doesn't have the job key, it's hard to > maintain. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-34021) Print jobKey in the Autoscaler standalone log
[ https://issues.apache.org/jira/browse/FLINK-34021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan resolved FLINK-34021. - Resolution: Fixed > Print jobKey in the Autoscaler standalone log > - > > Key: FLINK-34021 > URL: https://issues.apache.org/jira/browse/FLINK-34021 > Project: Flink > Issue Type: Sub-task > Components: Autoscaler >Affects Versions: kubernetes-operator-1.8.0 >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > FLINK-33814 has supported the multiple threads scaling for autoscaler > standalone. When a lot of jobs are scaling, autoscaler standalone will print > too many logs. Currently, each log doesn't have the job key, it's hard to > maintain. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34021][autoscaler] Print jobKey in the Autoscaler standalone log [flink-kubernetes-operator]
1996fanrui merged PR #750: URL: https://github.com/apache/flink-kubernetes-operator/pull/750 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34021][autoscaler] Print jobKey in the Autoscaler standalone log [flink-kubernetes-operator]
1996fanrui commented on PR #750: URL: https://github.com/apache/flink-kubernetes-operator/pull/750#issuecomment-1886189083 Thanks @mxm for the review, merging~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-34059) Add documentation on how to use state TTL hint
Jane Chan created FLINK-34059: - Summary: Add documentation on how to use state TTL hint Key: FLINK-34059 URL: https://issues.apache.org/jira/browse/FLINK-34059 Project: Flink Issue Type: Sub-task Components: Documentation, Table SQL / API Affects Versions: 1.19.0 Reporter: Jane Chan -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]
1996fanrui commented on code in PR #24003: URL: https://github.com/apache/flink/pull/24003#discussion_r1448255898 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java: ## @@ -367,17 +367,13 @@ private void restartTasksWithDelay(final FailureHandlingResult failureHandlingRe final CompletableFuture cancelFuture = cancelTasksAsync(verticesToRestart); -final FailureHandlingResultSnapshot failureHandlingResultSnapshot = -createFailureHandlingResultSnapshot(failureHandlingResult); +archiveFromFailureHandlingResult( +createFailureHandlingResultSnapshot(failureHandlingResult)); delayExecutor.schedule( () -> FutureUtils.assertNoException( cancelFuture.thenRunAsync( -() -> { -archiveFromFailureHandlingResult( Review Comment: Thanks for the clarification! The initial motivation might for collecting all concurrent exceptions. In this PR, the solution is save the `latestRootExceptionEntry` as a filed in `SchedulerBase`, when all subsequent non-root exceptions will be added to the `latestRootExceptionEntry`. ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java: ## @@ -140,15 +142,20 @@ private static RootExceptionHistoryEntry createRootExceptionHistoryEntry( failureLabels, failingTaskName, taskManagerLocation, -StreamSupport.stream(executions.spliterator(), false) -.filter(execution -> execution.getFailureInfo().isPresent()) -.map( -execution -> -ExceptionHistoryEntry.create( -execution, - execution.getVertexWithAttempt(), - FailureEnricherUtils.EMPTY_FAILURE_LABELS)) -.collect(Collectors.toList())); +createExceptionHistoryEntries(executions)); +} + +public static List createExceptionHistoryEntries( Review Comment: > we could move the logic into addConcurrentExceptions and call addConcurrentExceptions within createRootExceptionHistoryEntry on the newly created instance. It makes sense to me, thanks~ ## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureHandlingResult.java: ## @@ -206,6 +214,11 @@ public boolean isGlobalFailure() { return globalFailure; } +/** @return Whether this failure is a new attempt. */ +public boolean isNewAttempt() { Review Comment: I added some comments to explain it. What do you think? ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java: ## @@ -96,7 +98,7 @@ public static RootExceptionHistoryEntry fromGlobalFailure( } public static RootExceptionHistoryEntry fromExceptionHistoryEntry( -ExceptionHistoryEntry entry, Iterable entries) { +ExceptionHistoryEntry entry, List entries) { Review Comment: Change it to `Collection`. We need to merge more exeptions into the `concurrentExceptions`, and `Iterable` doesn't support change. So changing it to `Collection`. ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java: ## @@ -703,27 +707,41 @@ private void archiveGlobalFailure( long timestamp, CompletableFuture> failureLabels, Iterable executions) { -exceptionHistory.add( +latestRootExceptionEntry = RootExceptionHistoryEntry.fromGlobalFailure( -failure, timestamp, failureLabels, executions)); +failure, timestamp, failureLabels, executions); +exceptionHistory.add(latestRootExceptionEntry); log.debug("Archive global failure.", failure); } protected final void archiveFromFailureHandlingResult( FailureHandlingResultSnapshot failureHandlingResult) { +// ALl exceptions as the ConcurrentExceptions when it's not a new attempt. +if (!failureHandlingResult.isNewAttempt()) { +checkState(latestRootExceptionEntry != null, "It should have old failure."); +List concurrentlyExecutions = new LinkedList<>(); + failureHandlingResult.getRootCauseExecution().ifPresent(concurrentlyExecutions::add); + concurrentlyExecutions.addAll(failureHandlingResult.getConcurrentlyFailedExecution()); Review Comment: Thanks for the detailed reveiw! Good catch, I changed it to the `ArrayList`,
[jira] [Assigned] (FLINK-33819) Support setting CompressType in RocksDBStateBackend
[ https://issues.apache.org/jira/browse/FLINK-33819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hangxiang Yu reassigned FLINK-33819: Assignee: Yue Ma > Support setting CompressType in RocksDBStateBackend > --- > > Key: FLINK-33819 > URL: https://issues.apache.org/jira/browse/FLINK-33819 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.18.0 >Reporter: Yue Ma >Assignee: Yue Ma >Priority: Major > Fix For: 1.19.0 > > Attachments: image-2023-12-14-11-32-32-968.png, > image-2023-12-14-11-35-22-306.png > > > Currently, RocksDBStateBackend does not support setting the compression > level, and Snappy is used for compression by default. But we have some > scenarios where compression will use a lot of CPU resources. Turning off > compression can significantly reduce CPU overhead. So we may need to support > a parameter for users to set the CompressType of Rocksdb. > !image-2023-12-14-11-35-22-306.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-33946) RocksDb sets setAvoidFlushDuringShutdown to true to speed up Task Cancel
[ https://issues.apache.org/jira/browse/FLINK-33946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hangxiang Yu reassigned FLINK-33946: Assignee: Yue Ma (was: Hangxiang Yu) > RocksDb sets setAvoidFlushDuringShutdown to true to speed up Task Cancel > > > Key: FLINK-33946 > URL: https://issues.apache.org/jira/browse/FLINK-33946 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.19.0 >Reporter: Yue Ma >Assignee: Yue Ma >Priority: Major > Fix For: 1.19.0 > > > When a Job fails, the task needs to be canceled and re-deployed. > RocksDBStatebackend will call RocksDB.close when disposing. > {code:java} > if (!shutting_down_.load(std::memory_order_acquire) && > has_unpersisted_data_.load(std::memory_order_relaxed) && > !mutable_db_options_.avoid_flush_during_shutdown) { > if (immutable_db_options_.atomic_flush) { > autovector cfds; > SelectColumnFamiliesForAtomicFlush(&cfds); > mutex_.Unlock(); > Status s = > AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kShutDown); > s.PermitUncheckedError(); //**TODO: What to do on error? > mutex_.Lock(); > } else { > for (auto cfd : *versions_->GetColumnFamilySet()) { > if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) { > cfd->Ref(); > mutex_.Unlock(); > Status s = FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown); > s.PermitUncheckedError(); //**TODO: What to do on error? > mutex_.Lock(); > cfd->UnrefAndTryDelete(); > } > } > } {code} > By default (avoid_flush_during_shutdown=false) RocksDb requires FlushMemtable > when Close. When the disk pressure is high or the Memtable is large, this > process will be more time-consuming, which will cause the Task to get stuck > in the Canceling stage and affect the speed of job Failover. > In fact, it is completely unnecessary to Flush memtable when Flink Task is > Close, because the data can be replayed from Checkpoint. So we can set > avoid_flush_during_shutdown to true to speed up Task Failover -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-33946) RocksDb sets setAvoidFlushDuringShutdown to true to speed up Task Cancel
[ https://issues.apache.org/jira/browse/FLINK-33946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hangxiang Yu reassigned FLINK-33946: Assignee: Hangxiang Yu > RocksDb sets setAvoidFlushDuringShutdown to true to speed up Task Cancel > > > Key: FLINK-33946 > URL: https://issues.apache.org/jira/browse/FLINK-33946 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.19.0 >Reporter: Yue Ma >Assignee: Hangxiang Yu >Priority: Major > Fix For: 1.19.0 > > > When a Job fails, the task needs to be canceled and re-deployed. > RocksDBStatebackend will call RocksDB.close when disposing. > {code:java} > if (!shutting_down_.load(std::memory_order_acquire) && > has_unpersisted_data_.load(std::memory_order_relaxed) && > !mutable_db_options_.avoid_flush_during_shutdown) { > if (immutable_db_options_.atomic_flush) { > autovector cfds; > SelectColumnFamiliesForAtomicFlush(&cfds); > mutex_.Unlock(); > Status s = > AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kShutDown); > s.PermitUncheckedError(); //**TODO: What to do on error? > mutex_.Lock(); > } else { > for (auto cfd : *versions_->GetColumnFamilySet()) { > if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) { > cfd->Ref(); > mutex_.Unlock(); > Status s = FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown); > s.PermitUncheckedError(); //**TODO: What to do on error? > mutex_.Lock(); > cfd->UnrefAndTryDelete(); > } > } > } {code} > By default (avoid_flush_during_shutdown=false) RocksDb requires FlushMemtable > when Close. When the disk pressure is high or the Memtable is large, this > process will be more time-consuming, which will cause the Task to get stuck > in the Canceling stage and affect the speed of job Failover. > In fact, it is completely unnecessary to Flush memtable when Flink Task is > Close, because the data can be replayed from Checkpoint. So we can set > avoid_flush_during_shutdown to true to speed up Task Failover -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34015) Setting `execution.savepoint.ignore-unclaimed-state` does not take effect when passing this parameter by dynamic properties
[ https://issues.apache.org/jira/browse/FLINK-34015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805359#comment-17805359 ] Hangxiang Yu commented on FLINK-34015: -- [~zhourenxiang] Fine, it's vaild. I have assigned it to you and commented in your pr, please take a look and go ahead. > Setting `execution.savepoint.ignore-unclaimed-state` does not take effect > when passing this parameter by dynamic properties > --- > > Key: FLINK-34015 > URL: https://issues.apache.org/jira/browse/FLINK-34015 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.17.0 >Reporter: Renxiang Zhou >Assignee: Renxiang Zhou >Priority: Critical > Labels: ignore-unclaimed-state-invalid, pull-request-available > Attachments: image-2024-01-08-14-22-09-758.png, > image-2024-01-08-14-24-30-665.png > > > We set `execution.savepoint.ignore-unclaimed-state` to true and use -D option > to submit the job, but unfortunately we found the value is still false in > jobmanager log. > Pic 1: we set `execution.savepoint.ignore-unclaimed-state` to true in > submiting job. > !image-2024-01-08-14-22-09-758.png|width=1012,height=222! > Pic 2: The value is still false in jmlog. > !image-2024-01-08-14-24-30-665.png|width=651,height=51! > > Besides, the parameter `execution.savepoint-restore-mode` has the same > problem since when we pass it by -D option. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-34015) Setting `execution.savepoint.ignore-unclaimed-state` does not take effect when passing this parameter by dynamic properties
[ https://issues.apache.org/jira/browse/FLINK-34015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hangxiang Yu reassigned FLINK-34015: Assignee: Hangxiang Yu > Setting `execution.savepoint.ignore-unclaimed-state` does not take effect > when passing this parameter by dynamic properties > --- > > Key: FLINK-34015 > URL: https://issues.apache.org/jira/browse/FLINK-34015 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.17.0 >Reporter: Renxiang Zhou >Assignee: Hangxiang Yu >Priority: Critical > Labels: ignore-unclaimed-state-invalid, pull-request-available > Attachments: image-2024-01-08-14-22-09-758.png, > image-2024-01-08-14-24-30-665.png > > > We set `execution.savepoint.ignore-unclaimed-state` to true and use -D option > to submit the job, but unfortunately we found the value is still false in > jobmanager log. > Pic 1: we set `execution.savepoint.ignore-unclaimed-state` to true in > submiting job. > !image-2024-01-08-14-22-09-758.png|width=1012,height=222! > Pic 2: The value is still false in jmlog. > !image-2024-01-08-14-24-30-665.png|width=651,height=51! > > Besides, the parameter `execution.savepoint-restore-mode` has the same > problem since when we pass it by -D option. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-34015) Setting `execution.savepoint.ignore-unclaimed-state` does not take effect when passing this parameter by dynamic properties
[ https://issues.apache.org/jira/browse/FLINK-34015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hangxiang Yu reassigned FLINK-34015: Assignee: Renxiang Zhou (was: Hangxiang Yu) > Setting `execution.savepoint.ignore-unclaimed-state` does not take effect > when passing this parameter by dynamic properties > --- > > Key: FLINK-34015 > URL: https://issues.apache.org/jira/browse/FLINK-34015 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.17.0 >Reporter: Renxiang Zhou >Assignee: Renxiang Zhou >Priority: Critical > Labels: ignore-unclaimed-state-invalid, pull-request-available > Attachments: image-2024-01-08-14-22-09-758.png, > image-2024-01-08-14-24-30-665.png > > > We set `execution.savepoint.ignore-unclaimed-state` to true and use -D option > to submit the job, but unfortunately we found the value is still false in > jobmanager log. > Pic 1: we set `execution.savepoint.ignore-unclaimed-state` to true in > submiting job. > !image-2024-01-08-14-22-09-758.png|width=1012,height=222! > Pic 2: The value is still false in jmlog. > !image-2024-01-08-14-24-30-665.png|width=651,height=51! > > Besides, the parameter `execution.savepoint-restore-mode` has the same > problem since when we pass it by -D option. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34015][checkpoint] fix that passing ignore-unclaimed-state through dynamic props does not take effect [flink]
masteryhx commented on code in PR #24058: URL: https://github.com/apache/flink/pull/24058#discussion_r1448208494 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java: ## @@ -163,14 +163,16 @@ public static SavepointRestoreSettings forPath( public static void toConfiguration( final SavepointRestoreSettings savepointRestoreSettings, final Configuration configuration) { -configuration.set( -SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, -savepointRestoreSettings.allowNonRestoredState()); -configuration.set( -SavepointConfigOptions.RESTORE_MODE, savepointRestoreSettings.getRestoreMode()); -final String savepointPath = savepointRestoreSettings.getRestorePath(); -if (savepointPath != null) { -configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, savepointPath); +if (!savepointRestoreSettings.equals(SavepointRestoreSettings.none())) { Review Comment: I saw the equals of savepointRestoreSettings is not correct, so I created https://github.com/apache/flink/pull/24066 to fix this at first, welcome to review. ## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java: ## @@ -163,14 +163,16 @@ public static SavepointRestoreSettings forPath( public static void toConfiguration( final SavepointRestoreSettings savepointRestoreSettings, final Configuration configuration) { -configuration.set( -SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, -savepointRestoreSettings.allowNonRestoredState()); -configuration.set( -SavepointConfigOptions.RESTORE_MODE, savepointRestoreSettings.getRestoreMode()); -final String savepointPath = savepointRestoreSettings.getRestorePath(); -if (savepointPath != null) { -configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, savepointPath); +if (!savepointRestoreSettings.equals(SavepointRestoreSettings.none())) { Review Comment: I think the method contains the logic about setting savepointRestoreSettings to Configuration even for SavepointRestoreSettings.none. Some callers may just want to use SavepointRestoreSettings.none to override the configuration. e.g. RemoteStreamEnvironment#getEffectiveConfiguration (Of course, it seems not so correct about its logic). So How about letting the caller decide whether toConfiguration ? WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33946) RocksDb sets setAvoidFlushDuringShutdown to true to speed up Task Cancel
[ https://issues.apache.org/jira/browse/FLINK-33946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805355#comment-17805355 ] Yue Ma commented on FLINK-33946: [~masteryhx] thanks , I would like to take this and I'll draft the pr soon > RocksDb sets setAvoidFlushDuringShutdown to true to speed up Task Cancel > > > Key: FLINK-33946 > URL: https://issues.apache.org/jira/browse/FLINK-33946 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.19.0 >Reporter: Yue Ma >Priority: Major > Fix For: 1.19.0 > > > When a Job fails, the task needs to be canceled and re-deployed. > RocksDBStatebackend will call RocksDB.close when disposing. > {code:java} > if (!shutting_down_.load(std::memory_order_acquire) && > has_unpersisted_data_.load(std::memory_order_relaxed) && > !mutable_db_options_.avoid_flush_during_shutdown) { > if (immutable_db_options_.atomic_flush) { > autovector cfds; > SelectColumnFamiliesForAtomicFlush(&cfds); > mutex_.Unlock(); > Status s = > AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kShutDown); > s.PermitUncheckedError(); //**TODO: What to do on error? > mutex_.Lock(); > } else { > for (auto cfd : *versions_->GetColumnFamilySet()) { > if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) { > cfd->Ref(); > mutex_.Unlock(); > Status s = FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown); > s.PermitUncheckedError(); //**TODO: What to do on error? > mutex_.Lock(); > cfd->UnrefAndTryDelete(); > } > } > } {code} > By default (avoid_flush_during_shutdown=false) RocksDb requires FlushMemtable > when Close. When the disk pressure is high or the Memtable is large, this > process will be more time-consuming, which will cause the Task to get stuck > in the Canceling stage and affect the speed of job Failover. > In fact, it is completely unnecessary to Flush memtable when Flink Task is > Close, because the data can be replayed from Checkpoint. So we can set > avoid_flush_during_shutdown to true to speed up Task Failover -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Test [flink]
WencongLiu closed pull request #23978: Test URL: https://github.com/apache/flink/pull/23978 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33819) Support setting CompressType in RocksDBStateBackend
[ https://issues.apache.org/jira/browse/FLINK-33819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805354#comment-17805354 ] Yue Ma commented on FLINK-33819: [~masteryhx] thanks , I would like to take this and I'll draft the pr soon > Support setting CompressType in RocksDBStateBackend > --- > > Key: FLINK-33819 > URL: https://issues.apache.org/jira/browse/FLINK-33819 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.18.0 >Reporter: Yue Ma >Priority: Major > Fix For: 1.19.0 > > Attachments: image-2023-12-14-11-32-32-968.png, > image-2023-12-14-11-35-22-306.png > > > Currently, RocksDBStateBackend does not support setting the compression > level, and Snappy is used for compression by default. But we have some > scenarios where compression will use a lot of CPU resources. Turning off > compression can significantly reduce CPU overhead. So we may need to support > a parameter for users to set the CompressType of Rocksdb. > !image-2023-12-14-11-35-22-306.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34051][checkpoint] Fix equals/hashCode/toString for SavepointRestoreSettings [flink]
ljz2051 commented on code in PR #24066: URL: https://github.com/apache/flink/pull/24066#discussion_r1448240936 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java: ## @@ -105,12 +105,14 @@ public boolean equals(Object o) { SavepointRestoreSettings that = (SavepointRestoreSettings) o; return allowNonRestoredState == that.allowNonRestoredState -&& (Objects.equals(restorePath, that.restorePath)); +&& Objects.equals(restorePath, that.restorePath) +&& Objects.equals(restoreMode, that.restoreMode); } @Override public int hashCode() { int result = restorePath != null ? restorePath.hashCode() : 0; +result = 31 * result + (restoreMode != null ? restoreMode.hashCode() : 0); Review Comment: The restoreMode property in SavepointRestoreSettings should be "@Nonnull" ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34051][checkpoint] Fix equals/hashCode/toString for SavepointRestoreSettings [flink]
ljz2051 commented on code in PR #24066: URL: https://github.com/apache/flink/pull/24066#discussion_r1448240936 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java: ## @@ -105,12 +105,14 @@ public boolean equals(Object o) { SavepointRestoreSettings that = (SavepointRestoreSettings) o; return allowNonRestoredState == that.allowNonRestoredState -&& (Objects.equals(restorePath, that.restorePath)); +&& Objects.equals(restorePath, that.restorePath) +&& Objects.equals(restoreMode, that.restoreMode); } @Override public int hashCode() { int result = restorePath != null ? restorePath.hashCode() : 0; +result = 31 * result + (restoreMode != null ? restoreMode.hashCode() : 0); Review Comment: The restoreMode property in SavepointRestoreSettings should be \@Nonnull ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34051][checkpoint] Fix equals/hashCode/toString for SavepointRestoreSettings [flink]
ljz2051 commented on code in PR #24066: URL: https://github.com/apache/flink/pull/24066#discussion_r1448240936 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java: ## @@ -105,12 +105,14 @@ public boolean equals(Object o) { SavepointRestoreSettings that = (SavepointRestoreSettings) o; return allowNonRestoredState == that.allowNonRestoredState -&& (Objects.equals(restorePath, that.restorePath)); +&& Objects.equals(restorePath, that.restorePath) +&& Objects.equals(restoreMode, that.restoreMode); } @Override public int hashCode() { int result = restorePath != null ? restorePath.hashCode() : 0; +result = 31 * result + (restoreMode != null ? restoreMode.hashCode() : 0); Review Comment: The restoreMode property in SavepointRestoreSettings should be @Nonnull ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33009) tools/release/update_japicmp_configuration.sh should only enable binary compatibility checks in the release branch
[ https://issues.apache.org/jira/browse/FLINK-33009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805352#comment-17805352 ] Wencong Liu commented on FLINK-33009: - I've opened a pull request and CI has passed. 😄 > tools/release/update_japicmp_configuration.sh should only enable binary > compatibility checks in the release branch > -- > > Key: FLINK-33009 > URL: https://issues.apache.org/jira/browse/FLINK-33009 > Project: Flink > Issue Type: Bug > Components: Release System >Affects Versions: 1.19.0 >Reporter: Matthias Pohl >Assignee: Wencong Liu >Priority: Major > Labels: pull-request-available > > According to [Flink's API compatibility > constraints|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/], > we only support binary compatibility between patch versions. In > [apache-flink:pom.xml:2246|https://github.com/apache/flink/blob/aa8d93ea239f5be79066b7e5caad08d966c86ab2/pom.xml#L2246] > we have binary compatibility enabled even in {{master}}. This doesn't comply > with the rules. We should this flag disabled in {{master}}. The > {{tools/release/update_japicmp_configuration.sh}} should enable this flag in > the release branch as part of the release process. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-33905) FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs
[ https://issues.apache.org/jira/browse/FLINK-33905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song closed FLINK-33905. Resolution: Done master (1.19): 06b46a9cbf0d8fa987bbde570510f75a7558f54d > FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs > --- > > Key: FLINK-33905 > URL: https://issues.apache.org/jira/browse/FLINK-33905 > Project: Flink > Issue Type: Improvement > Components: API / Core >Affects Versions: 1.19.0 >Reporter: Wencong Liu >Assignee: Wencong Liu >Priority: Major > Labels: pull-request-available > > This ticket is proposed for > [FLIP-382|https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]
xintongsong closed pull request #23905: [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs URL: https://github.com/apache/flink/pull/23905 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-34015) Setting `execution.savepoint.ignore-unclaimed-state` does not take effect when passing this parameter by dynamic properties
[ https://issues.apache.org/jira/browse/FLINK-34015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805349#comment-17805349 ] Renxiang Zhou edited comment on FLINK-34015 at 1/11/24 2:36 AM: [~masteryhx] Many thanks for your reply~ Yes, if we use the dynamic parameters to recover from a savepoint instead of using '-s' option, the CLI will generate the SavepointRestoreSettings.none() and set it to configuration. was (Author: JIRAUSER295459): [~masteryhx] Yes, if we use the dynamic parameters to recover from a savepoint instead of using '-s' option, the CLI will generate the SavepointRestoreSettings.none() and set it to configuration. > Setting `execution.savepoint.ignore-unclaimed-state` does not take effect > when passing this parameter by dynamic properties > --- > > Key: FLINK-34015 > URL: https://issues.apache.org/jira/browse/FLINK-34015 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.17.0 >Reporter: Renxiang Zhou >Priority: Critical > Labels: ignore-unclaimed-state-invalid, pull-request-available > Attachments: image-2024-01-08-14-22-09-758.png, > image-2024-01-08-14-24-30-665.png > > > We set `execution.savepoint.ignore-unclaimed-state` to true and use -D option > to submit the job, but unfortunately we found the value is still false in > jobmanager log. > Pic 1: we set `execution.savepoint.ignore-unclaimed-state` to true in > submiting job. > !image-2024-01-08-14-22-09-758.png|width=1012,height=222! > Pic 2: The value is still false in jmlog. > !image-2024-01-08-14-24-30-665.png|width=651,height=51! > > Besides, the parameter `execution.savepoint-restore-mode` has the same > problem since when we pass it by -D option. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34015) Setting `execution.savepoint.ignore-unclaimed-state` does not take effect when passing this parameter by dynamic properties
[ https://issues.apache.org/jira/browse/FLINK-34015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805349#comment-17805349 ] Renxiang Zhou commented on FLINK-34015: --- [~masteryhx] Yes, if we use the dynamic parameters to recover from a savepoint instead of using '-s' option, the CLI will generate the SavepointRestoreSettings.none() and set it to configuration. > Setting `execution.savepoint.ignore-unclaimed-state` does not take effect > when passing this parameter by dynamic properties > --- > > Key: FLINK-34015 > URL: https://issues.apache.org/jira/browse/FLINK-34015 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.17.0 >Reporter: Renxiang Zhou >Priority: Critical > Labels: ignore-unclaimed-state-invalid, pull-request-available > Attachments: image-2024-01-08-14-22-09-758.png, > image-2024-01-08-14-24-30-665.png > > > We set `execution.savepoint.ignore-unclaimed-state` to true and use -D option > to submit the job, but unfortunately we found the value is still false in > jobmanager log. > Pic 1: we set `execution.savepoint.ignore-unclaimed-state` to true in > submiting job. > !image-2024-01-08-14-22-09-758.png|width=1012,height=222! > Pic 2: The value is still false in jmlog. > !image-2024-01-08-14-24-30-665.png|width=651,height=51! > > Besides, the parameter `execution.savepoint-restore-mode` has the same > problem since when we pass it by -D option. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34015) Setting `execution.savepoint.ignore-unclaimed-state` does not take effect when passing this parameter by dynamic properties
[ https://issues.apache.org/jira/browse/FLINK-34015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805349#comment-17805349 ] Renxiang Zhou edited comment on FLINK-34015 at 1/11/24 2:35 AM: [~masteryhx] Yes, if we use the dynamic parameters to recover from a savepoint instead of using '-s' option, the CLI will generate the SavepointRestoreSettings.none() and set it to configuration. was (Author: JIRAUSER295459): [~masteryhx] Yes, if we use the dynamic parameters to recover from a savepoint instead of using '-s' option, the CLI will generate the SavepointRestoreSettings.none() and set it to configuration. > Setting `execution.savepoint.ignore-unclaimed-state` does not take effect > when passing this parameter by dynamic properties > --- > > Key: FLINK-34015 > URL: https://issues.apache.org/jira/browse/FLINK-34015 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.17.0 >Reporter: Renxiang Zhou >Priority: Critical > Labels: ignore-unclaimed-state-invalid, pull-request-available > Attachments: image-2024-01-08-14-22-09-758.png, > image-2024-01-08-14-24-30-665.png > > > We set `execution.savepoint.ignore-unclaimed-state` to true and use -D option > to submit the job, but unfortunately we found the value is still false in > jobmanager log. > Pic 1: we set `execution.savepoint.ignore-unclaimed-state` to true in > submiting job. > !image-2024-01-08-14-22-09-758.png|width=1012,height=222! > Pic 2: The value is still false in jmlog. > !image-2024-01-08-14-24-30-665.png|width=651,height=51! > > Besides, the parameter `execution.savepoint-restore-mode` has the same > problem since when we pass it by -D option. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34058) Support optional parameters for named parameters
Feng Jin created FLINK-34058: Summary: Support optional parameters for named parameters Key: FLINK-34058 URL: https://issues.apache.org/jira/browse/FLINK-34058 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Feng Jin Fix For: 1.19.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34055) Introduce a new annotation for named parameters
[ https://issues.apache.org/jira/browse/FLINK-34055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Feng Jin updated FLINK-34055: - Summary: Introduce a new annotation for named parameters (was: Introduce a new annotation for named parameters.) > Introduce a new annotation for named parameters > --- > > Key: FLINK-34055 > URL: https://issues.apache.org/jira/browse/FLINK-34055 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Feng Jin >Priority: Major > Fix For: 1.19.0 > > > Introduce a new annotation to specify the parameter name, indicate if it is > optional, and potentially support specifying default values in the future. > Deprecate the argumentNames method in FunctionHints as it is not > user-friendly for specifying argument names with optional configuration. > > {code:java} > public @interface ArgumentHint { > /** > * The name of the parameter, default is an empty string. > */ > String name() default ""; > > /** > * Whether the parameter is optional, default is false. > */ > boolean isOptional() default false; > > /** > * The data type hint for the parameter. > */ > DataTypeHint type() default @DataTypeHint(); > } > {code} > {code:java} > public @interface FunctionHint { > > /** > * Deprecated attribute for specifying the names of the arguments. > * It is no longer recommended to use this attribute. > */ > @Deprecated > String[] argumentNames() default {""}; > > /** > * Attribute for specifying the hints and additional information for > function arguments. > */ > ArgumentHint[] arguments() default {}; > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34057) Support named parameters for functions
Feng Jin created FLINK-34057: Summary: Support named parameters for functions Key: FLINK-34057 URL: https://issues.apache.org/jira/browse/FLINK-34057 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Feng Jin Fix For: 1.19.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34056) Support named parameters for procedures
Feng Jin created FLINK-34056: Summary: Support named parameters for procedures Key: FLINK-34056 URL: https://issues.apache.org/jira/browse/FLINK-34056 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Feng Jin Fix For: 1.19.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32852) [JUnit5 Migration] The scheduler package of flink-runtime module
[ https://issues.apache.org/jira/browse/FLINK-32852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan updated FLINK-32852: Fix Version/s: 1.19.0 > [JUnit5 Migration] The scheduler package of flink-runtime module > > > Key: FLINK-32852 > URL: https://issues.apache.org/jira/browse/FLINK-32852 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Rui Fan >Assignee: RocMarshal >Priority: Minor > Labels: pull-request-available > Fix For: 1.19.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32852) [JUnit5 Migration] The scheduler package of flink-runtime module
[ https://issues.apache.org/jira/browse/FLINK-32852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805348#comment-17805348 ] Rui Fan commented on FLINK-32852: - Merged to master(1.19) via a6412b8497d1fdb3c0137a1651767db33836d966 > [JUnit5 Migration] The scheduler package of flink-runtime module > > > Key: FLINK-32852 > URL: https://issues.apache.org/jira/browse/FLINK-32852 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Rui Fan >Assignee: RocMarshal >Priority: Minor > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34054) FLIP-387: Support named parameters for functions and procedures
[ https://issues.apache.org/jira/browse/FLINK-34054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Feng Jin updated FLINK-34054: - Summary: FLIP-387: Support named parameters for functions and procedures (was: FLIP-387: Support named parameters for functions and call procedures) > FLIP-387: Support named parameters for functions and procedures > --- > > Key: FLINK-34054 > URL: https://issues.apache.org/jira/browse/FLINK-34054 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Reporter: Feng Jin >Priority: Major > Fix For: 1.19.0 > > > Umbrella issue for > https://cwiki.apache.org/confluence/display/FLINK/FLIP-387%3A+Support+named+parameters+for+functions+and+call+procedures -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-32852][JUnit5 migration] Migrate ExceptionHistoryEntryTest and RootExceptionHistoryEntryTest to Junit5 and Assertj [flink]
1996fanrui merged PR #24062: URL: https://github.com/apache/flink/pull/24062 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32852][JUnit5 migration] Migrate ExceptionHistoryEntryTest and RootExceptionHistoryEntryTest to Junit5 and Assertj [flink]
1996fanrui commented on PR #24062: URL: https://github.com/apache/flink/pull/24062#issuecomment-1886099946 Thanks @RocMarshal for the review! Ci is green, merging~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-34055) Introduce a new annotation for named parameters.
Feng Jin created FLINK-34055: Summary: Introduce a new annotation for named parameters. Key: FLINK-34055 URL: https://issues.apache.org/jira/browse/FLINK-34055 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Feng Jin Fix For: 1.19.0 Introduce a new annotation to specify the parameter name, indicate if it is optional, and potentially support specifying default values in the future. Deprecate the argumentNames method in FunctionHints as it is not user-friendly for specifying argument names with optional configuration. {code:java} public @interface ArgumentHint { /** * The name of the parameter, default is an empty string. */ String name() default ""; /** * Whether the parameter is optional, default is false. */ boolean isOptional() default false; /** * The data type hint for the parameter. */ DataTypeHint type() default @DataTypeHint(); } {code} {code:java} public @interface FunctionHint { /** * Deprecated attribute for specifying the names of the arguments. * It is no longer recommended to use this attribute. */ @Deprecated String[] argumentNames() default {""}; /** * Attribute for specifying the hints and additional information for function arguments. */ ArgumentHint[] arguments() default {}; } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33792] Generate the same code for the same logic [flink]
lsyldliu commented on code in PR #23984: URL: https://github.com/apache/flink/pull/23984#discussion_r1448202301 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala: ## @@ -87,10 +90,11 @@ object LongHashJoinGenerator { def genProjection( tableConfig: ReadableConfig, classLoader: ClassLoader, - types: Array[LogicalType]): GeneratedProjection = { + types: Array[LogicalType], + parentCtx: CodeGeneratorContext): GeneratedProjection = { Review Comment: I just mean that we can get the `tableConfig` and `classLoader` from `parentCtx`, but this isn't an important point, ignore it now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-34054) FLIP-387: Support named parameters for functions and call procedures
Feng Jin created FLINK-34054: Summary: FLIP-387: Support named parameters for functions and call procedures Key: FLINK-34054 URL: https://issues.apache.org/jira/browse/FLINK-34054 Project: Flink Issue Type: New Feature Components: Table SQL / Planner Reporter: Feng Jin Fix For: 1.19.0 Umbrella issue for https://cwiki.apache.org/confluence/display/FLINK/FLIP-387%3A+Support+named+parameters+for+functions+and+call+procedures -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [hotfix] Revert the breaking change to the public implementations of RichFunction [flink]
flinkbot commented on PR #24067: URL: https://github.com/apache/flink/pull/24067#issuecomment-1886092566 ## CI report: * 5d2c318f914b6cc790dabdf318b1788d0e0128a6 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-34053) Support state TTL hint for group aggregate
[ https://issues.apache.org/jira/browse/FLINK-34053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jane Chan reassigned FLINK-34053: - Assignee: xuyang > Support state TTL hint for group aggregate > -- > > Key: FLINK-34053 > URL: https://issues.apache.org/jira/browse/FLINK-34053 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Affects Versions: 1.19.0 >Reporter: Jane Chan >Assignee: xuyang >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30088) Excessive state updates for TtlMapState and TtlListState
[ https://issues.apache.org/jira/browse/FLINK-30088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hangxiang Yu updated FLINK-30088: - Component/s: Runtime / State Backends > Excessive state updates for TtlMapState and TtlListState > > > Key: FLINK-30088 > URL: https://issues.apache.org/jira/browse/FLINK-30088 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Roman Boyko >Assignee: Roman Boyko >Priority: Minor > Labels: pull-request-available, stale-assigned > Attachments: image-2022-11-18-20-25-14-466.png, > image-2022-11-18-20-27-24-054.png > > > After merging the FLINK-21413 every ttl check for cleanup for TtlMapState and > TtlListState (even without expired elements) leads to whole state update. > This is because: > - comparison by link inside `TtlIncrementalCleanup`: > !image-2022-11-18-20-25-14-466.png|width=450,height=288! > - and creating new map or list inside TtlMapState or TtlListState: > !image-2022-11-18-20-27-24-054.png|width=477,height=365! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [hotfix] Revert the breaking change to the public implementations of RichFunction [flink]
WencongLiu opened a new pull request, #24067: URL: https://github.com/apache/flink/pull/24067 ## What is the purpose of the change *Revert the breaking change to the public implementations of RichFunction. The original breaking change is applied by [FLINK-32978](https://issues.apache.org/jira/browse/FLINK-32978?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel).* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34053) Support state TTL hint for group aggregate
[ https://issues.apache.org/jira/browse/FLINK-34053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805346#comment-17805346 ] xuyang commented on FLINK-34053: Hi, [~qingyue] Can I take this jira? > Support state TTL hint for group aggregate > -- > > Key: FLINK-34053 > URL: https://issues.apache.org/jira/browse/FLINK-34053 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Affects Versions: 1.19.0 >Reporter: Jane Chan >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34052) Missing TopSpeedWindowing and SessionWindowing JARs in Flink Maven Repository
[ https://issues.apache.org/jira/browse/FLINK-34052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Junrui Li updated FLINK-34052: -- Component/s: Examples > Missing TopSpeedWindowing and SessionWindowing JARs in Flink Maven Repository > - > > Key: FLINK-34052 > URL: https://issues.apache.org/jira/browse/FLINK-34052 > Project: Flink > Issue Type: Bug > Components: Build System, Examples >Affects Versions: 1.18.0 >Reporter: Junrui Li >Priority: Major > > As a result of the changes implemented in FLINK-32821, the build process no > longer produces artifacts with the names > flink-examples-streaming-1.x-TopSpeedWindowing.jar and > flink-examples-streaming-1.x-SessionWindowing.jar. This has led to the > absence of these specific JAR files in the Maven repository > (https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming/1.18.0/). > These artifacts were previously available and may still be expected by users > as part of their application dependencies. Their removal could potentially > break existing build pipelines and applications that depend on these example > JARs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34053) Support state TTL hint for group aggregate
Jane Chan created FLINK-34053: - Summary: Support state TTL hint for group aggregate Key: FLINK-34053 URL: https://issues.apache.org/jira/browse/FLINK-34053 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Affects Versions: 1.19.0 Reporter: Jane Chan -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-33583) Support state TTL hint for regular join
[ https://issues.apache.org/jira/browse/FLINK-33583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jane Chan resolved FLINK-33583. --- Fix Version/s: 1.19.0 Resolution: Fixed > Support state TTL hint for regular join > --- > > Key: FLINK-33583 > URL: https://issues.apache.org/jira/browse/FLINK-33583 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API, Table SQL / Planner >Affects Versions: 1.19.0 >Reporter: xuyang >Assignee: xuyang >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-33583) Support state TTL hint for regular join
[ https://issues.apache.org/jira/browse/FLINK-33583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jane Chan closed FLINK-33583. - > Support state TTL hint for regular join > --- > > Key: FLINK-33583 > URL: https://issues.apache.org/jira/browse/FLINK-33583 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API, Table SQL / Planner >Affects Versions: 1.19.0 >Reporter: xuyang >Assignee: xuyang >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33583) Support state TTL hint for regular join
[ https://issues.apache.org/jira/browse/FLINK-33583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805345#comment-17805345 ] Jane Chan commented on FLINK-33583: --- Fixed in master 21403e31f4761bdddf5e4e802e0e5eb9b4533202 > Support state TTL hint for regular join > --- > > Key: FLINK-33583 > URL: https://issues.apache.org/jira/browse/FLINK-33583 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API, Table SQL / Planner >Affects Versions: 1.19.0 >Reporter: xuyang >Assignee: xuyang >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33583][table-planner] Support state ttl hint for regular join [flink]
LadyForest merged PR #23752: URL: https://github.com/apache/flink/pull/23752 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33879] Avoids the potential hang of Hybrid Shuffle during redistribution [flink]
reswqa commented on code in PR #23957: URL: https://github.com/apache/flink/pull/23957#discussion_r1448184760 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java: ## @@ -187,6 +187,7 @@ public void finish() throws IOException { @Override public void close() { +storageMemoryManager.release(); Review Comment: I wonder how do we guarantee that `numRequestedBuffers.get() > bufferPool.getNumBuffers()` at this time, Otherwise, the buffer will be recycled to `bufferQueue` instead of `LocalBufferPool`. ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionTest.java: ## @@ -192,15 +190,38 @@ void testMetricsUpdateForBroadcastOnlyResultPartition() throws Exception { try (TieredResultPartition partition = createTieredStoreResultPartition(2, bufferPool, true)) { partition.broadcastRecord(ByteBuffer.allocate(bufferSize)); -IOMetrics ioMetrics = taskIOMetricGroup.createSnapshot(); -assertThat(ioMetrics.getResultPartitionBytes()).hasSize(1); -ResultPartitionBytes partitionBytes = - ioMetrics.getResultPartitionBytes().values().iterator().next(); -assertThat(partitionBytes.getSubpartitionBytes()) -.containsExactly(bufferSize, bufferSize); +verifySubpartitionBytes(bufferSize, bufferSize); } } +@Test +@Timeout(60) Review Comment: Local timeout should be avoid according by https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#avoid-timeouts-in-junit-tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34051][checkpoint] Fix equals/hashCode/toString for SavepointRestoreSettings [flink]
flinkbot commented on PR #24066: URL: https://github.com/apache/flink/pull/24066#issuecomment-1886063614 ## CI report: * 96b1dc66449cb7782351ac379f78fdd0a29cb8ac UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34051) Fix equals/hashCode/toString for SavepointRestoreSettings
[ https://issues.apache.org/jira/browse/FLINK-34051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34051: --- Labels: pull-request-available (was: ) > Fix equals/hashCode/toString for SavepointRestoreSettings > - > > Key: FLINK-34051 > URL: https://issues.apache.org/jira/browse/FLINK-34051 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.19.0 >Reporter: Hangxiang Yu >Assignee: Hangxiang Yu >Priority: Minor > Labels: pull-request-available > > SavepointRestoreSettings#equals/hashCode/toString missed restoreMode property -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-34051][checkpoint] Fix equals/hashCode/toString for SavepointRestoreSettings [flink]
masteryhx opened a new pull request, #24066: URL: https://github.com/apache/flink/pull/24066 ## What is the purpose of the change SavepointRestoreSettings#equals/hashCode/toString missed restoreMode property ## Brief change log - Add restoreMode property for SavepointRestoreSettings ## Verifying this change This change added tests and can be verified as follows: - Added SavepointRestoreSettingsTest ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32416] initial implementation of DynamicKafkaSource with bound… [flink-connector-kafka]
mas-chen commented on code in PR #44: URL: https://github.com/apache/flink-connector-kafka/pull/44#discussion_r1448066046 ## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/DynamicKafkaSource.java: ## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.source; Review Comment: Let me know if you have any other considerations. I'll squash the commits if we are in agreement (leaving the dangling commit now for your reviewing convenience) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Add Hive 3.0.0 connector [flink-web]
snuyanzin opened a new pull request, #709: URL: https://github.com/apache/flink-web/pull/709 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33611] [flink-protobuf] Support Large Protobuf Schemas [flink]
sharath1709 commented on PR #23937: URL: https://github.com/apache/flink/pull/23937#issuecomment-1885748350 @libenchao Could you review this PR again, it is updated as per our discussion on the JIRA ticket and I also added a test case that fails without the changes in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34007) Flink Job stuck in suspend state after losing leadership in HA Mode
[ https://issues.apache.org/jira/browse/FLINK-34007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805294#comment-17805294 ] Zhenqiu Huang commented on FLINK-34007: --- [~mapohl] Yes, I mistakenly looked into the flink 1.17 source code. I uploaded another debug log above. The KubernetesLeaderElector check the annotation "control-plane.alpha.kubernetes.io/leader" and whether the lockIdentity exists in content. Given this job only has 1 job manager, there should be no other job manager instance try to acquire the lock. The only possibility is that somehow the cluster config map is returned incorrectly. In this case, even fabric8 LeaderElector will continue to try to acquire leadership (If it can get without exceed deadline), flink will not able to restart services (such RM and dispatcher) as DefaultLeaderRetrievalService is stopped also. To resolve the issue for now, should we focus on gracefully shutdown Job Manager rather than move job to Suspended status? > Flink Job stuck in suspend state after losing leadership in HA Mode > --- > > Key: FLINK-34007 > URL: https://issues.apache.org/jira/browse/FLINK-34007 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.3, 1.17.2, 1.18.1, 1.18.2 >Reporter: Zhenqiu Huang >Priority: Major > Attachments: Debug.log, job-manager.log > > > The observation is that Job manager goes to suspend state with a failed > container not able to register itself to resource manager after timeout. > JM Log, see attached > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34023) Expose Kinesis client retry config in sink
[ https://issues.apache.org/jira/browse/FLINK-34023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805279#comment-17805279 ] Brad Atcheson commented on FLINK-34023: --- Yes. I'll look into it, post a proposed solution approach here, and wait for agreement before starting to implement. > Expose Kinesis client retry config in sink > -- > > Key: FLINK-34023 > URL: https://issues.apache.org/jira/browse/FLINK-34023 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kinesis >Reporter: Brad Atcheson >Priority: Major > > The consumer side exposes client retry configuration like > [flink.shard.getrecords.maxretries|https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.html#SHARD_GETRECORDS_RETRIES] > but the producer side lacks similar config for PutRecords. > The KinesisStreamsSinkWriter constructor calls > {code} > this.httpClient = > AWSGeneralUtil.createAsyncHttpClient(kinesisClientProperties); > this.kinesisClient = buildClient(kinesisClientProperties, this.httpClient); > {code} > But those methods only refer to these values (aside from > endpoint/region/creds) in the kinesisClientProperties: > * aws.http-client.max-concurrency > * aws.http-client.read-timeout > * aws.trust.all.certificates > * aws.http.protocol.version > Without control over retry, users can observe exceptions like {code}Request > attempt 2 failure: Unable to execute HTTP request: connection timed out after > 2000 ms: kinesis.us-west-2.amazonaws.com{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34007) Flink Job stuck in suspend state after losing leadership in HA Mode
[ https://issues.apache.org/jira/browse/FLINK-34007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenqiu Huang updated FLINK-34007: -- Attachment: Debug.log > Flink Job stuck in suspend state after losing leadership in HA Mode > --- > > Key: FLINK-34007 > URL: https://issues.apache.org/jira/browse/FLINK-34007 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.3, 1.17.2, 1.18.1, 1.18.2 >Reporter: Zhenqiu Huang >Priority: Major > Attachments: Debug.log, job-manager.log > > > The observation is that Job manager goes to suspend state with a failed > container not able to register itself to resource manager after timeout. > JM Log, see attached > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1885629516 > I haven't gone through all the code and comments yet, there are some inline comments below, however, two questions came into my mind while reviewing the PR: > > * Why do you need to refactor `ParameterizedPredicate` and `JdbcFilterPushdownPreparedStatementVisitor`? Can you just adapt `JdbcRowDataLookupFunction` just like `JdbcRowDataInputFormat`? > * Have you considered `PrepareStatement` way to handle literals, as we have already discussed in the Jira? (I'm not sure about this, but looking at `JdbcRowDataLookupFunction`, it seems no place are handling this, so I assume that the implementation does not address that) I am new to this area. I was looking to work within the existing design with minimal changes (as the design was added ind a flip and works for scan queries. I think we have something that solves this critical issue - @libenchao would you be ok to proceed with this design and raise a subsequent issue / flip for a more elegant design. Or are you thinking this design is not appropriate and should not be merged? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1447891687 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java: ## @@ -78,6 +79,8 @@ public class JdbcDynamicTableSource private List resolvedPredicates = new ArrayList<>(); private Serializable[] pushdownParams = new Serializable[0]; +private List pushdownParameterizedPredicates = new ArrayList<>(); Review Comment: we tried that originally but the problem was that the template contains a ? and backticks which could be in the column name. so last week I changed the design to pass through the index of the placeholder, to remove ambiguity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32416] initial implementation of DynamicKafkaSource with bound… [flink-connector-kafka]
mas-chen commented on code in PR #44: URL: https://github.com/apache/flink-connector-kafka/pull/44#discussion_r1447804230 ## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/DynamicKafkaSource.java: ## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.source; Review Comment: I actually prefer `dynamic.source` since it makes sense for future usecases to leverage the dynamic code. e.g. a `dynamic.sink`. With that being said, I'm going to move the metadata up one package since that isn't source specific. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33971]Specifies whether to use HBase table that supports dynamic columns. [flink-connector-hbase]
ferenc-csaky commented on PR #36: URL: https://github.com/apache/flink-connector-hbase/pull/36#issuecomment-1885437651 > @ferenc-csaky Want to take a look? I'll review it in the next 2 days. Thanks for your contribution @MOBIN-F! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28915] Support fetching artifacts in native K8s and standalone application mode [flink]
flinkbot commented on PR #24065: URL: https://github.com/apache/flink/pull/24065#issuecomment-1885431512 ## CI report: * b779d13403028c290b803501a66f8c212a92ad8b UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-28915] Support fetching artifacts in native K8s and standalone application mode [flink]
ferenc-csaky opened a new pull request, #24065: URL: https://github.com/apache/flink/pull/24065 ## What is the purpose of the change Support fetching remote job JAR and additional artifacts (UDFs, formats, dependencies, etc.) in nativa Kubernetes and standalone application mode. The current change contains fetchers for DFS (via Flink FS abstraction) and HTTP. ## Brief change log * In standalone app mode, a `--jars` option is added, which has unlimited args, which will be fetched before the Flink cluster start. Example: ```sh ./bin/standalone-job.sh start-foreground \ --jars http://localhost:/flink-sandbox.jar http://localhost:/test-udf.jar \ --job-classname org.apache.flink.DummyJob ``` * In native K8s app mode, the user can define additional artifacts via the `user.artifacts.artifact-list` property. Example: ```sh ./bin/flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=flink-cluster \ -Dkubernetes.container.image.ref=flink \ -Duser.artifacts.artifact-list=http://host.minikube.internal:/test-udf.jar \ http://host.minikube.internal:/flink-sandbox.jar ``` ## Verifying this change This change added tests and can be verified as follows: * Added tests for artifact fetching utils. * Added tests for artifact fetching logic. * Added tests to cover the changes in `DefaultPackagedProgramRetriever`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: Artifact deployment in native Kubernetes app mode. - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? docs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31481][table] Support enhanced show databases syntax [flink]
jeyhunkarimov commented on code in PR #23612: URL: https://github.com/apache/flink/pull/23612#discussion_r1447763299 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlShowDatabasesConverter.java: ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.dql.SqlShowDatabases; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.ShowDatabasesOperation; + +/** A converter for {@link SqlShowDatabases}. */ +public class SqlShowDatabasesConverter implements SqlNodeConverter { + +@Override +public Operation convertSqlNode(SqlShowDatabases sqlShowDatabases, ConvertContext context) { +if (sqlShowDatabases.getPreposition() == null) { +return new ShowDatabasesOperation( +sqlShowDatabases.getLikeType(), +sqlShowDatabases.getLikeSqlPattern(), +sqlShowDatabases.isNotLike()); +} else { +CatalogManager catalogManager = context.getCatalogManager(); +String[] fullCatalogName = sqlShowDatabases.getCatalog(); +String catalogName = +fullCatalogName.length == 0 +? catalogManager.getCurrentCatalog() +: fullCatalogName[0]; Review Comment: Sorry for the confusion. No, there is no case that `preposition` is `not null` and `catalog` is `null`. In fact, based on our discussion, I have already removed `preposition` field from `ShowDatabasesOperation`. `catalog` is null (so the `preposition` that doesn't exist now) only when there is no `FROM/IN` clause. Do the changes in `ShowDatabasesOperation` reflect your comments or did I miss something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33969] Implement restore tests for TableSourceScan node [flink]
bvarghese1 commented on code in PR #24020: URL: https://github.com/apache/flink/pull/24020#discussion_r1447722550 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceScanTestPrograms.java: ## @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.table.utils.DateTimeUtils; +import org.apache.flink.types.Row; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecTableSourceScan}. */ +public class TableSourceScanTestPrograms { + +static final Row[] BEFORE_DATA = { +Row.of(1, 1L, "hi", DateTimeUtils.toLocalDateTime(1586937601000L)), +Row.of(2, 2L, "hello", DateTimeUtils.toLocalDateTime(1586937602000L)), +Row.of(3, 2L, "hello world", DateTimeUtils.toLocalDateTime(1586937603000L)) +}; + +static final Row[] AFTER_DATA = { +Row.of(4, 4L, "foo", DateTimeUtils.toLocalDateTime(1586937614000L)), +Row.of(5, 2L, "foo bar", DateTimeUtils.toLocalDateTime(1586937615000L)), +}; + +static final TableTestProgram PROJECT_PUSHDOWN = +TableTestProgram.of( +"table-source-scan-project-pushdown", +"validates table source scan with project pushdown") +.setupTableSource( +SourceTestStep.newBuilder("source_t") +.addSchema("a INT", "b BIGINT", "c VARCHAR") +.producedBeforeRestore(BEFORE_DATA) +.producedAfterRestore(AFTER_DATA) +.build()) +.setupTableSink( +SinkTestStep.newBuilder("sink_t") +.addSchema("a INT", "b VARCHAR") +.consumedBeforeRestore( +"+I[1, hi]", "+I[2, hello]", "+I[3, hello world]") +.consumedAfterRestore("+I[4, foo]", "+I[5, foo bar]") +.build()) +.runSql("INSERT INTO sink_t SELECT a, c FROM source_t") +.build(); + +static final TableTestProgram PROJECT_PUSHDOWN_DISABLED = +TableTestProgram.of( +"table-source-scan-project-push-down-disabled", +"validates table source scan with project pushdown disabled") +.setupTableSource( +SourceTestStep.newBuilder("source_t") +.addSchema( +"a INT", +"b BIGINT", +"c VARCHAR", +"ts TIMESTAMP(3) METADATA") +.addOption("readable-metadata", "ts:TIMESTAMP(3)") +.addOption("enable-projection-push-down", "false") +.producedBeforeRestore(BEFORE_DATA) +.producedAfterRestore(AFTER_DATA) +.build()) +.setupTableSink( +SinkTestStep.newBuilder("sink_t") +.addSchema("a INT", "c VARCHAR") +.consumedBeforeRestore( +"+I[1, hi]", "+I[2, hello]", "+I[3, hello world]") +.consumedAfterRestore("+I[4, foo]", "+I[5, foo bar]") +.build()) +.runSql("INSERT INTO sink_t SELECT a, c FROM source_t") +.build(); + +static final TableTestProgram FILTER_PUSHDOWN = +TableTestProgram.of( +
Re: [PR] [FLINK-33969] Implement restore tests for TableSourceScan node [flink]
bvarghese1 commented on code in PR #24020: URL: https://github.com/apache/flink/pull/24020#discussion_r1447721761 ## flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java: ## @@ -198,6 +198,18 @@ public SqlTestStep getRunSqlTestStep() { return (SqlTestStep) sqlSteps.get(0); } +/** + * Convenience method to avoid boilerplate code. It assumes only one statement set is tested. + */ +public StatementSetTestStep getRunStatementSetTestStep() { +List statementSetSteps = +runSteps.stream() +.filter(s -> s.getKind() == TestKind.STATEMENT_SET) +.collect(Collectors.toList()); + +return (StatementSetTestStep) statementSetSteps.get(0); Review Comment: Added a precondition check to verify similar to `getRunSqlTestStep` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
hlteoh37 commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1447650514 ## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkWriter.java: ## @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseClassifier; +import org.apache.flink.connector.prometheus.sink.prometheus.Remote; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; + +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.io.CloseMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.xerial.snappy.Snappy; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; + +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_SAMPLES_DROPPED; +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_SAMPLES_NON_RETRIABLE_DROPPED; +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_SAMPLES_OUT; +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_SAMPLES_RETRY_LIMIT_DROPPED; +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_WRITE_REQUESTS_OUT; +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_WRITE_REQUESTS_PERMANENTLY_FAILED; + +/** Writer, taking care of batching the {@link PrometheusTimeSeries} and handling retries. */ +public class PrometheusSinkWriter extends AsyncSinkWriter { + +/** + * * Batching of this sink is in terms of Samples, not bytes. The goal is adaptively increase + * the number of Samples in each batch, a WriteRequest sent to Prometheus, to a configurable + * number. This is the parameter maxBatchSizeInBytes. + * + * getSizeInBytes(requestEntry) returns the number of Samples (not bytes) and + * maxBatchSizeInBytes is actually in terms of Samples (not bytes). + * + * In AsyncSinkWriter, maxBatchSize is in terms of requestEntries (TimeSeries). But because + * each TimeSeries contains 1+ Samples, we set maxBatchSize = maxBatchSizeInBytes. + * + * maxRecordSizeInBytes is also calculated in the same unit assumed by getSizeInBytes(..). In + * our case is the max number of Samples in a single TimeSeries sent to the Sink. We are + * limiting the number of Samples in each TimeSeries to the max batch size, setting + * maxRecordSizeInBytes = maxBatchSizeInBytes. + */ +private static final Logger LOG = LoggerFactory.getLogger(PrometheusSinkWriter.class); + +private final SinkCounters counters; +private final CloseableHttpAsyncClient asyncHttpClient; +private final PrometheusRemoteWriteHttpRequestBuilder requestBuilder; + +public PrometheusSinkWriter( +ElementConverter elementConverter, +Sink.InitContext context, +int maxInFlightRequests, +int maxBufferedRequests, +int maxBatchSizeInSamples, +long maxTimeInBufferMS, +String prometheusRemoteWriteUrl, +CloseableHttpAsyncClient asyncHttpClient, +SinkCounters counters, +PrometheusRequestSigner requestSigner) { +this( +elementConverter, +context, +maxInFlightRequests, +maxBufferedRequests, +maxBatchSi
Re: [PR] [FLINK-33914][ci] Adds basic Flink CI workflow [flink]
XComp commented on PR #23970: URL: https://github.com/apache/flink/pull/23970#issuecomment-1885215058 I triggered another workflow with lower timeouts: https://github.com/XComp/flink/actions/runs/7478072394 ...to double-check that running into time outs works as expected. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
hlteoh37 commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1447607637 ## amp-request-signer/pom.xml: ## @@ -0,0 +1,63 @@ + + +http://maven.apache.org/POM/4.0.0"; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> +4.0.0 + +org.apache.flink +flink-prometheus +1.0.0-SNAPSHOT + + +Flink : Connectors : Prometheus : AMP request signer +org.apache.flink.connector.prometheus +amp-request-signer + + +UTF-8 +11 +${target.java.version} +${target.java.version} + +1.16.0 Review Comment: Should we consider 1.17? Since the supported version is 1.17 + 1.18 now ## amp-request-signer/pom.xml: ## @@ -0,0 +1,63 @@ + + +http://maven.apache.org/POM/4.0.0"; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> +4.0.0 + +org.apache.flink +flink-prometheus +1.0.0-SNAPSHOT + + +Flink : Connectors : Prometheus : AMP request signer +org.apache.flink.connector.prometheus +amp-request-signer + + +UTF-8 +11 +${target.java.version} +${target.java.version} + +1.16.0 + + + + +org.apache.flink +flink-connector-prometheus +${project.version} + Review Comment: IIUC, this should be `provided`! ## pom.xml: ## @@ -0,0 +1,82 @@ + + +http://maven.apache.org/POM/4.0.0"; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> +4.0.0 + +org.apache.flink +flink-prometheus Review Comment: Should we keep the same convention as other connector repos `flink-connector-prometheus-parent`? ## amp-request-signer/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSigner.java: ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink.aws; + +import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner; +import org.apache.flink.util.Preconditions; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSSessionCredentials; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.util.BinaryUtils; +import org.apache.commons.lang3.StringUtils; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Map; + +/** Sing a Remote-Write request to Amazon Managed Service for Prometheus (AMP). */ +public class AmazonManagedPrometheusWriteRequestSigner implements PrometheusRequestSigner { + +private final URL remoteWriteUrl; +private final String awsRegion; + +/** + * Constructor. + * + * @param remoteWriteUrl URL of the remote-write endpoint + * @param awsRegion Region of the AMP workspace + */ +public AmazonManagedPrometheusWriteRequestSigner(String remoteWriteUrl, String awsRegion) { +Preconditions.checkArgument(StringUtils.isNotBlank(awsRegion)); Review Comment: Should we add a message here in the event of failure? ## amp-request-signer/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AWS4SignerBase.java: ## @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, sof
[jira] [Commented] (FLINK-34007) Flink Job stuck in suspend state after losing leadership in HA Mode
[ https://issues.apache.org/jira/browse/FLINK-34007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805217#comment-17805217 ] Matthias Pohl commented on FLINK-34007: --- The fix you shared ended up in 6.0.0. That would mean that we shouldn't experience the issue in a Flink 1.18 cluster. The fabric8 client was updated to 6.6.2 in 1.18.0 (FLINK-31997). > Flink Job stuck in suspend state after losing leadership in HA Mode > --- > > Key: FLINK-34007 > URL: https://issues.apache.org/jira/browse/FLINK-34007 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.3, 1.17.2, 1.18.1, 1.18.2 >Reporter: Zhenqiu Huang >Priority: Major > Attachments: job-manager.log > > > The observation is that Job manager goes to suspend state with a failed > container not able to register itself to resource manager after timeout. > JM Log, see attached > -- This message was sent by Atlassian Jira (v8.20.10#820010)