Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
yunfengzhou-hub commented on PR #23927: URL: https://github.com/apache/flink/pull/23927#issuecomment-1862278523 Hi @TanYuxin-tyx could you please take a look at this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33588) Fix Flink Checkpointing Statistics Bug
[ https://issues.apache.org/jira/browse/FLINK-33588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798468#comment-17798468 ] Jing Ge commented on FLINK-33588: - [~zhutong66] Thanks for the feedback. Could you please add UT or IT? > Fix Flink Checkpointing Statistics Bug > -- > > Key: FLINK-33588 > URL: https://issues.apache.org/jira/browse/FLINK-33588 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.14.5, 1.16.0, 1.17.0, 1.15.2, 1.14.6, 1.18.0, 1.17.1 >Reporter: Tongtong Zhu >Assignee: Tongtong Zhu >Priority: Critical > Labels: pull-request-available > Fix For: 1.19.0, 1.18.1 > > Attachments: FLINK-33588.patch, image-2023-12-11-17-35-23-391.png, > image-2023-12-13-11-35-43-780.png, image-2023-12-15-13-59-28-201.png, > image-2023-12-19-14-02-49-083.png, image-2023-12-19-14-03-27-062.png, > newCommit-FLINK-33688.patch > > > When the Flink task is first started, the checkpoint data is null due to the > lack of data, and Percentile throws a null pointer exception when calculating > the percentage. After multiple tests, I found that it is necessary to set an > initial value for the statistical data value of the checkpoint when the > checkpoint data is null (i.e. at the beginning of the task) to solve this > problem. > The following is an abnormal description of the bug: > 2023-09-13 15:02:54,608 ERROR > org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler > [] - Unhandled exception. > org.apache.commons.math3.exception.NullArgumentException: input array > at > org.apache.commons.math3.util.MathArrays.verifyValues(MathArrays.java:1650) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.commons.math3.stat.descriptive.AbstractUnivariateStatistic.test(AbstractUnivariateStatistic.java:158) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.commons.math3.stat.descriptive.rank.Percentile.evaluate(Percentile.java:272) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.commons.math3.stat.descriptive.rank.Percentile.evaluate(Percentile.java:241) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogramStatistics$CommonMetricsSnapshot.getPercentile(DescriptiveStatisticsHistogramStatistics.java:159) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogramStatistics.getQuantile(DescriptiveStatisticsHistogramStatistics.java:53) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.checkpoint.StatsSummarySnapshot.getQuantile(StatsSummarySnapshot.java:108) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.messages.checkpoints.StatsSummaryDto.valueOf(StatsSummaryDto.java:81) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.createCheckpointingStatistics(CheckpointingStatisticsHandler.java:129) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.handleRequest(CheckpointingStatisticsHandler.java:84) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.handleRequest(CheckpointingStatisticsHandler.java:58) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.AbstractAccessExecutionGraphHandler.handleRequest(AbstractAccessExecutionGraphHandler.java:68) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$0(AbstractExecutionGraphHandler.java:87) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) > [?:1.8.0_151] > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > [?:1.8.0_151] > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > [?:1.8.0_151] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [?:1.8.0_151] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_151] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > [?:1.8.0_151] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > [?:1.8.0_151] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_151] > at >
Re: [PR] [FLINK-26088][Connectors/ElasticSearch] Add Elasticsearch 8.0 support [flink-connector-elasticsearch]
rinkako commented on PR #53: URL: https://github.com/apache/flink-connector-elasticsearch/pull/53#issuecomment-1862265394 > > @mtfelisb Are you still active on this PR? > > Hi, @MartijnVisser. Yes, I'm waiting for review. Also, I'll not implement the `DynamicTableSink` because @rinkako started [here](https://github.com/rinkako/flink-connector-elasticsearch_es8/tree/FLINK-26088/flink-connector-elasticsearch8/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table) based on my pull request. Thanks by the way @rinkako! ☺️ thanks @mtfelisb , I'm willing to PR my code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]
liming30 commented on code in PR #23922: URL: https://github.com/apache/flink/pull/23922#discussion_r1430990459 ## flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java: ## @@ -359,6 +365,58 @@ public void testDbPathRelativePaths() throws Exception { rocksDbBackend.setDbStoragePath("relative/path"); } +@Test +@Timeout(value = 60) +public void testCleanRelocatedDbLogs() throws Exception { +final File folder = tempFolder.newFolder(); +final File relocatedDBLogDir = tempFolder.newFolder("db_logs"); +final File logFile = new File(relocatedDBLogDir, "taskManager.log"); +Files.createFile(logFile.toPath()); +System.setProperty("log.file", logFile.getAbsolutePath()); + +Configuration conf = new Configuration(); +conf.set(RocksDBConfigurableOptions.LOG_LEVEL, InfoLogLevel.DEBUG_LEVEL); +conf.set(RocksDBConfigurableOptions.LOG_FILE_NUM, 4); +conf.set(RocksDBConfigurableOptions.LOG_MAX_FILE_SIZE, MemorySize.parse("1kb")); +final EmbeddedRocksDBStateBackend rocksDbBackend = +new EmbeddedRocksDBStateBackend().configure(conf, getClass().getClassLoader()); +final String dbStoragePath = new Path(folder.toURI().toString()).toString(); +rocksDbBackend.setDbStoragePath(dbStoragePath); + +final MockEnvironment env = getMockEnvironment(tempFolder.newFolder()); +RocksDBKeyedStateBackend keyedBackend = +createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE); +// clear unused file +FileUtils.deleteFileOrDirectory(logFile); + +File instanceBasePath = keyedBackend.getInstanceBasePath(); +File instanceRocksDBPath = + RocksDBKeyedStateBackendBuilder.getInstanceRocksDBPath(instanceBasePath); + +// avoid tests without relocate. +Assume.assumeTrue(instanceRocksDBPath.getAbsolutePath().length() <= 255 - "_LOG".length()); Review Comment: I noticed that the length limit issue was being addressed in `frocksdb`, which would ensure relocations occurred, but there was no further progress on the related PR. https://github.com/ververica/frocksdb/pull/66 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]
liming30 commented on code in PR #23922: URL: https://github.com/apache/flink/pull/23922#discussion_r1430988502 ## flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java: ## @@ -359,6 +365,58 @@ public void testDbPathRelativePaths() throws Exception { rocksDbBackend.setDbStoragePath("relative/path"); } +@Test +@Timeout(value = 60) +public void testCleanRelocatedDbLogs() throws Exception { +final File folder = tempFolder.newFolder(); +final File relocatedDBLogDir = tempFolder.newFolder("db_logs"); +final File logFile = new File(relocatedDBLogDir, "taskManager.log"); +Files.createFile(logFile.toPath()); +System.setProperty("log.file", logFile.getAbsolutePath()); + +Configuration conf = new Configuration(); +conf.set(RocksDBConfigurableOptions.LOG_LEVEL, InfoLogLevel.DEBUG_LEVEL); +conf.set(RocksDBConfigurableOptions.LOG_FILE_NUM, 4); +conf.set(RocksDBConfigurableOptions.LOG_MAX_FILE_SIZE, MemorySize.parse("1kb")); +final EmbeddedRocksDBStateBackend rocksDbBackend = +new EmbeddedRocksDBStateBackend().configure(conf, getClass().getClassLoader()); +final String dbStoragePath = new Path(folder.toURI().toString()).toString(); +rocksDbBackend.setDbStoragePath(dbStoragePath); + +final MockEnvironment env = getMockEnvironment(tempFolder.newFolder()); +RocksDBKeyedStateBackend keyedBackend = +createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE); +// clear unused file +FileUtils.deleteFileOrDirectory(logFile); Review Comment: Good idea, I will resolve it in the next commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]
liming30 commented on code in PR #23922: URL: https://github.com/apache/flink/pull/23922#discussion_r1430987609 ## flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java: ## @@ -359,6 +365,58 @@ public void testDbPathRelativePaths() throws Exception { rocksDbBackend.setDbStoragePath("relative/path"); } +@Test +@Timeout(value = 60) +public void testCleanRelocatedDbLogs() throws Exception { +final File folder = tempFolder.newFolder(); +final File relocatedDBLogDir = tempFolder.newFolder("db_logs"); +final File logFile = new File(relocatedDBLogDir, "taskManager.log"); +Files.createFile(logFile.toPath()); +System.setProperty("log.file", logFile.getAbsolutePath()); + +Configuration conf = new Configuration(); +conf.set(RocksDBConfigurableOptions.LOG_LEVEL, InfoLogLevel.DEBUG_LEVEL); +conf.set(RocksDBConfigurableOptions.LOG_FILE_NUM, 4); +conf.set(RocksDBConfigurableOptions.LOG_MAX_FILE_SIZE, MemorySize.parse("1kb")); +final EmbeddedRocksDBStateBackend rocksDbBackend = +new EmbeddedRocksDBStateBackend().configure(conf, getClass().getClassLoader()); +final String dbStoragePath = new Path(folder.toURI().toString()).toString(); +rocksDbBackend.setDbStoragePath(dbStoragePath); + +final MockEnvironment env = getMockEnvironment(tempFolder.newFolder()); +RocksDBKeyedStateBackend keyedBackend = +createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE); +// clear unused file +FileUtils.deleteFileOrDirectory(logFile); + +File instanceBasePath = keyedBackend.getInstanceBasePath(); +File instanceRocksDBPath = + RocksDBKeyedStateBackendBuilder.getInstanceRocksDBPath(instanceBasePath); + +// avoid tests without relocate. +Assume.assumeTrue(instanceRocksDBPath.getAbsolutePath().length() <= 255 - "_LOG".length()); Review Comment: Sorry, I cannot guarantee that the length of `instanceRocksDBPath` will be less than `255 - "_LOG".length()` in this test, which means that relocation may not occur. Forcibly specifying an absolute path may cause problems such as the path not existing and no permissions on the test machine. Is there any other way to ensure that relocation will definitely happen? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33588) Fix Flink Checkpointing Statistics Bug
[ https://issues.apache.org/jira/browse/FLINK-33588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798442#comment-17798442 ] Tongtong Zhu commented on FLINK-33588: -- [~jingge] I have tested this change on the local cluster using the latest Flink code and there are no issues. The screenshot of the test indicates the test results on ([https://github.com/apache/flink/pull/23931])# 23931 (Verifying this change), which completely fixes this bug.Also, I have tested many test cases before. !image-2023-12-19-14-03-27-062.png|width=934,height=655! > Fix Flink Checkpointing Statistics Bug > -- > > Key: FLINK-33588 > URL: https://issues.apache.org/jira/browse/FLINK-33588 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.14.5, 1.16.0, 1.17.0, 1.15.2, 1.14.6, 1.18.0, 1.17.1 >Reporter: Tongtong Zhu >Assignee: Tongtong Zhu >Priority: Critical > Labels: pull-request-available > Fix For: 1.19.0, 1.18.1 > > Attachments: FLINK-33588.patch, image-2023-12-11-17-35-23-391.png, > image-2023-12-13-11-35-43-780.png, image-2023-12-15-13-59-28-201.png, > image-2023-12-19-14-02-49-083.png, image-2023-12-19-14-03-27-062.png, > newCommit-FLINK-33688.patch > > > When the Flink task is first started, the checkpoint data is null due to the > lack of data, and Percentile throws a null pointer exception when calculating > the percentage. After multiple tests, I found that it is necessary to set an > initial value for the statistical data value of the checkpoint when the > checkpoint data is null (i.e. at the beginning of the task) to solve this > problem. > The following is an abnormal description of the bug: > 2023-09-13 15:02:54,608 ERROR > org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler > [] - Unhandled exception. > org.apache.commons.math3.exception.NullArgumentException: input array > at > org.apache.commons.math3.util.MathArrays.verifyValues(MathArrays.java:1650) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.commons.math3.stat.descriptive.AbstractUnivariateStatistic.test(AbstractUnivariateStatistic.java:158) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.commons.math3.stat.descriptive.rank.Percentile.evaluate(Percentile.java:272) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.commons.math3.stat.descriptive.rank.Percentile.evaluate(Percentile.java:241) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogramStatistics$CommonMetricsSnapshot.getPercentile(DescriptiveStatisticsHistogramStatistics.java:159) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogramStatistics.getQuantile(DescriptiveStatisticsHistogramStatistics.java:53) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.checkpoint.StatsSummarySnapshot.getQuantile(StatsSummarySnapshot.java:108) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.messages.checkpoints.StatsSummaryDto.valueOf(StatsSummaryDto.java:81) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.createCheckpointingStatistics(CheckpointingStatisticsHandler.java:129) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.handleRequest(CheckpointingStatisticsHandler.java:84) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.handleRequest(CheckpointingStatisticsHandler.java:58) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.AbstractAccessExecutionGraphHandler.handleRequest(AbstractAccessExecutionGraphHandler.java:68) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$0(AbstractExecutionGraphHandler.java:87) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) > [?:1.8.0_151] > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > [?:1.8.0_151] > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > [?:1.8.0_151] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [?:1.8.0_151] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_151] > at >
[jira] [Updated] (FLINK-33588) Fix Flink Checkpointing Statistics Bug
[ https://issues.apache.org/jira/browse/FLINK-33588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tongtong Zhu updated FLINK-33588: - Attachment: image-2023-12-19-14-03-27-062.png > Fix Flink Checkpointing Statistics Bug > -- > > Key: FLINK-33588 > URL: https://issues.apache.org/jira/browse/FLINK-33588 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.14.5, 1.16.0, 1.17.0, 1.15.2, 1.14.6, 1.18.0, 1.17.1 >Reporter: Tongtong Zhu >Assignee: Tongtong Zhu >Priority: Critical > Labels: pull-request-available > Fix For: 1.19.0, 1.18.1 > > Attachments: FLINK-33588.patch, image-2023-12-11-17-35-23-391.png, > image-2023-12-13-11-35-43-780.png, image-2023-12-15-13-59-28-201.png, > image-2023-12-19-14-02-49-083.png, image-2023-12-19-14-03-27-062.png, > newCommit-FLINK-33688.patch > > > When the Flink task is first started, the checkpoint data is null due to the > lack of data, and Percentile throws a null pointer exception when calculating > the percentage. After multiple tests, I found that it is necessary to set an > initial value for the statistical data value of the checkpoint when the > checkpoint data is null (i.e. at the beginning of the task) to solve this > problem. > The following is an abnormal description of the bug: > 2023-09-13 15:02:54,608 ERROR > org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler > [] - Unhandled exception. > org.apache.commons.math3.exception.NullArgumentException: input array > at > org.apache.commons.math3.util.MathArrays.verifyValues(MathArrays.java:1650) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.commons.math3.stat.descriptive.AbstractUnivariateStatistic.test(AbstractUnivariateStatistic.java:158) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.commons.math3.stat.descriptive.rank.Percentile.evaluate(Percentile.java:272) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.commons.math3.stat.descriptive.rank.Percentile.evaluate(Percentile.java:241) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogramStatistics$CommonMetricsSnapshot.getPercentile(DescriptiveStatisticsHistogramStatistics.java:159) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogramStatistics.getQuantile(DescriptiveStatisticsHistogramStatistics.java:53) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.checkpoint.StatsSummarySnapshot.getQuantile(StatsSummarySnapshot.java:108) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.messages.checkpoints.StatsSummaryDto.valueOf(StatsSummaryDto.java:81) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.createCheckpointingStatistics(CheckpointingStatisticsHandler.java:129) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.handleRequest(CheckpointingStatisticsHandler.java:84) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.handleRequest(CheckpointingStatisticsHandler.java:58) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.AbstractAccessExecutionGraphHandler.handleRequest(AbstractAccessExecutionGraphHandler.java:68) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$0(AbstractExecutionGraphHandler.java:87) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) > [?:1.8.0_151] > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > [?:1.8.0_151] > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > [?:1.8.0_151] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [?:1.8.0_151] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_151] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > [?:1.8.0_151] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > [?:1.8.0_151] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_151] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >
[jira] [Updated] (FLINK-33588) Fix Flink Checkpointing Statistics Bug
[ https://issues.apache.org/jira/browse/FLINK-33588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tongtong Zhu updated FLINK-33588: - Attachment: image-2023-12-19-14-02-49-083.png > Fix Flink Checkpointing Statistics Bug > -- > > Key: FLINK-33588 > URL: https://issues.apache.org/jira/browse/FLINK-33588 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.14.5, 1.16.0, 1.17.0, 1.15.2, 1.14.6, 1.18.0, 1.17.1 >Reporter: Tongtong Zhu >Assignee: Tongtong Zhu >Priority: Critical > Labels: pull-request-available > Fix For: 1.19.0, 1.18.1 > > Attachments: FLINK-33588.patch, image-2023-12-11-17-35-23-391.png, > image-2023-12-13-11-35-43-780.png, image-2023-12-15-13-59-28-201.png, > image-2023-12-19-14-02-49-083.png, newCommit-FLINK-33688.patch > > > When the Flink task is first started, the checkpoint data is null due to the > lack of data, and Percentile throws a null pointer exception when calculating > the percentage. After multiple tests, I found that it is necessary to set an > initial value for the statistical data value of the checkpoint when the > checkpoint data is null (i.e. at the beginning of the task) to solve this > problem. > The following is an abnormal description of the bug: > 2023-09-13 15:02:54,608 ERROR > org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler > [] - Unhandled exception. > org.apache.commons.math3.exception.NullArgumentException: input array > at > org.apache.commons.math3.util.MathArrays.verifyValues(MathArrays.java:1650) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.commons.math3.stat.descriptive.AbstractUnivariateStatistic.test(AbstractUnivariateStatistic.java:158) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.commons.math3.stat.descriptive.rank.Percentile.evaluate(Percentile.java:272) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.commons.math3.stat.descriptive.rank.Percentile.evaluate(Percentile.java:241) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogramStatistics$CommonMetricsSnapshot.getPercentile(DescriptiveStatisticsHistogramStatistics.java:159) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogramStatistics.getQuantile(DescriptiveStatisticsHistogramStatistics.java:53) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.checkpoint.StatsSummarySnapshot.getQuantile(StatsSummarySnapshot.java:108) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.messages.checkpoints.StatsSummaryDto.valueOf(StatsSummaryDto.java:81) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.createCheckpointingStatistics(CheckpointingStatisticsHandler.java:129) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.handleRequest(CheckpointingStatisticsHandler.java:84) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.handleRequest(CheckpointingStatisticsHandler.java:58) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.AbstractAccessExecutionGraphHandler.handleRequest(AbstractAccessExecutionGraphHandler.java:68) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$0(AbstractExecutionGraphHandler.java:87) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) > [?:1.8.0_151] > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > [?:1.8.0_151] > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > [?:1.8.0_151] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [?:1.8.0_151] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_151] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > [?:1.8.0_151] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > [?:1.8.0_151] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_151] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_151] > at
Re: [PR] [FLINK-32877][Filesystem]add HTTP options to gcs-cloud-storage client [flink]
singhravidutt commented on PR #23226: URL: https://github.com/apache/flink/pull/23226#issuecomment-1862159340 > Also @singhravidutt what's your Jira user name, so I can assign this ticket to you? you can assign this to me now. My jira username is: singhravidutt -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32346) JdbcNumericBetweenParametersProvider Sharding key boundaries large storage long integer overflow, use BigDecimal instead Long
[ https://issues.apache.org/jira/browse/FLINK-32346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798422#comment-17798422 ] zhilinli commented on FLINK-32346: -- Currently the documentation and the code already diverge. The document says "The scan.partition.column must be a numeric, date, or timestamp column from the table in question", however the code only supports numeric (not bigger than long). [~libenchao] > JdbcNumericBetweenParametersProvider Sharding key boundaries large storage > long integer overflow, use BigDecimal instead Long > -- > > Key: FLINK-32346 > URL: https://issues.apache.org/jira/browse/FLINK-32346 > Project: Flink > Issue Type: Improvement > Components: Connectors / JDBC >Reporter: zhilinli >Priority: Major > Attachments: image-2023-06-15-16-42-16-773.png, > image-2023-06-15-16-46-13-188.png > > > *JdbcNumericBetweenParametersProvider.class* > Sharding key boundaries large storage long integer overflow, use BigDecimal > instead Long, so that length types such as DecimalType(30,0) are compatible > and LONG cannot be stored Can be assigned to me and I want to complete it > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-32346) JdbcNumericBetweenParametersProvider Sharding key boundaries large storage long integer overflow, use BigDecimal instead Long
[ https://issues.apache.org/jira/browse/FLINK-32346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798422#comment-17798422 ] zhilinli edited comment on FLINK-32346 at 12/19/23 4:38 AM: Due to some strange database design, the primary key of many tables is a UUID string, I think we can support this as a priority, I am preparing how to implement [~libenchao] was (Author: JIRAUSER291519): Currently the documentation and the code already diverge. The document says "The scan.partition.column must be a numeric, date, or timestamp column from the table in question", however the code only supports numeric (not bigger than long). [~libenchao] > JdbcNumericBetweenParametersProvider Sharding key boundaries large storage > long integer overflow, use BigDecimal instead Long > -- > > Key: FLINK-32346 > URL: https://issues.apache.org/jira/browse/FLINK-32346 > Project: Flink > Issue Type: Improvement > Components: Connectors / JDBC >Reporter: zhilinli >Priority: Major > Attachments: image-2023-06-15-16-42-16-773.png, > image-2023-06-15-16-46-13-188.png > > > *JdbcNumericBetweenParametersProvider.class* > Sharding key boundaries large storage long integer overflow, use BigDecimal > instead Long, so that length types such as DecimalType(30,0) are compatible > and LONG cannot be stored Can be assigned to me and I want to complete it > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-31650][metrics][rest] Remove transient metrics for subtasks in terminal state [flink]
X-czh commented on PR #23447: URL: https://github.com/apache/flink/pull/23447#issuecomment-1862095289 @wanglijie95 Squashed and rebased, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33198][Format/Avro] support local timezone timestamp logic type in AVRO [flink]
leonardBang commented on PR #23511: URL: https://github.com/apache/flink/pull/23511#issuecomment-1862084984 Hi, @HuangZhenQiu Would you like to rebase the PR to the latest master branch ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32881][checkpoint] Support triggering savepoint in detach mode for CLI and dumping all pending savepoint ids by rest api [flink]
xiangforever2014 commented on PR #23253: URL: https://github.com/apache/flink/pull/23253#issuecomment-1862077615 @masteryhx kindly remind~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33861] Implement restore tests for WindowRank node [flink]
flinkbot commented on PR #23956: URL: https://github.com/apache/flink/pull/23956#issuecomment-1862077122 ## CI report: * 8db6276722e5ead414953e0c782ca1b062e902a2 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33877][streaming] Fix unstable test of CollectSinkFunctionTest.testConfiguredPortIsUsed [flink]
flinkbot commented on PR #23955: URL: https://github.com/apache/flink/pull/23955#issuecomment-1862076600 ## CI report: * e5da58df5653766d430260ae75c1c5f186652b8b UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33198][Format/Avro] support local timezone timestamp logic type in AVRO [flink]
wuchong commented on PR #23511: URL: https://github.com/apache/flink/pull/23511#issuecomment-1862075968 Hi @PatrickRen @leonardBang , do you have time to help review this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]
1996fanrui commented on code in PR #23922: URL: https://github.com/apache/flink/pull/23922#discussion_r1430885170 ## flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java: ## @@ -359,6 +365,58 @@ public void testDbPathRelativePaths() throws Exception { rocksDbBackend.setDbStoragePath("relative/path"); } +@Test +@Timeout(value = 60) +public void testCleanRelocatedDbLogs() throws Exception { +final File folder = tempFolder.newFolder(); +final File relocatedDBLogDir = tempFolder.newFolder("db_logs"); +final File logFile = new File(relocatedDBLogDir, "taskManager.log"); +Files.createFile(logFile.toPath()); +System.setProperty("log.file", logFile.getAbsolutePath()); + +Configuration conf = new Configuration(); +conf.set(RocksDBConfigurableOptions.LOG_LEVEL, InfoLogLevel.DEBUG_LEVEL); +conf.set(RocksDBConfigurableOptions.LOG_FILE_NUM, 4); +conf.set(RocksDBConfigurableOptions.LOG_MAX_FILE_SIZE, MemorySize.parse("1kb")); +final EmbeddedRocksDBStateBackend rocksDbBackend = +new EmbeddedRocksDBStateBackend().configure(conf, getClass().getClassLoader()); +final String dbStoragePath = new Path(folder.toURI().toString()).toString(); +rocksDbBackend.setDbStoragePath(dbStoragePath); + +final MockEnvironment env = getMockEnvironment(tempFolder.newFolder()); +RocksDBKeyedStateBackend keyedBackend = +createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE); +// clear unused file +FileUtils.deleteFileOrDirectory(logFile); Review Comment: We don't need to clean the `taskManager.log`, right? If so, we can check that only rocksdb log is cleaned. (We should ensure rocksdb doesn't clean other logs.) ## flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java: ## @@ -359,6 +365,58 @@ public void testDbPathRelativePaths() throws Exception { rocksDbBackend.setDbStoragePath("relative/path"); } +@Test +@Timeout(value = 60) +public void testCleanRelocatedDbLogs() throws Exception { +final File folder = tempFolder.newFolder(); +final File relocatedDBLogDir = tempFolder.newFolder("db_logs"); +final File logFile = new File(relocatedDBLogDir, "taskManager.log"); +Files.createFile(logFile.toPath()); +System.setProperty("log.file", logFile.getAbsolutePath()); + +Configuration conf = new Configuration(); +conf.set(RocksDBConfigurableOptions.LOG_LEVEL, InfoLogLevel.DEBUG_LEVEL); +conf.set(RocksDBConfigurableOptions.LOG_FILE_NUM, 4); +conf.set(RocksDBConfigurableOptions.LOG_MAX_FILE_SIZE, MemorySize.parse("1kb")); +final EmbeddedRocksDBStateBackend rocksDbBackend = +new EmbeddedRocksDBStateBackend().configure(conf, getClass().getClassLoader()); +final String dbStoragePath = new Path(folder.toURI().toString()).toString(); +rocksDbBackend.setDbStoragePath(dbStoragePath); + +final MockEnvironment env = getMockEnvironment(tempFolder.newFolder()); +RocksDBKeyedStateBackend keyedBackend = +createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE); +// clear unused file +FileUtils.deleteFileOrDirectory(logFile); + +File instanceBasePath = keyedBackend.getInstanceBasePath(); +File instanceRocksDBPath = + RocksDBKeyedStateBackendBuilder.getInstanceRocksDBPath(instanceBasePath); + +// avoid tests without relocate. +Assume.assumeTrue(instanceRocksDBPath.getAbsolutePath().length() <= 255 - "_LOG".length()); Review Comment: Could we ensure the relocation always take effect in current test? If we cannot ensure it, this test will be ignored, and we can't found this bug if the log clean logic is broken in the future. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33877) CollectSinkFunctionTest.testConfiguredPortIsUsed fails due to BindException
[ https://issues.apache.org/jira/browse/FLINK-33877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798417#comment-17798417 ] Jiabao Sun commented on FLINK-33877: Based on the exception stack trace, it is possible that port 50500 is being used by other test cases. To make this test more stable, we may need to use a random port. Hi [~jingge], could you help review it when you have time? Thanks. > CollectSinkFunctionTest.testConfiguredPortIsUsed fails due to BindException > --- > > Key: FLINK-33877 > URL: https://issues.apache.org/jira/browse/FLINK-33877 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.19.0 >Reporter: Jiabao Sun >Priority: Critical > Labels: pull-request-available, test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=55646=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9482 > {noformat} > Dec 18 17:49:57 17:49:57.241 [ERROR] > org.apache.flink.streaming.api.operators.collect.CollectSinkFunctionTest.testConfiguredPortIsUsed > -- Time elapsed: 0.021 s <<< ERROR! > Dec 18 17:49:57 java.net.BindException: Address already in use (Bind failed) > Dec 18 17:49:57 at java.net.PlainSocketImpl.socketBind(Native Method) > Dec 18 17:49:57 at > java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:387) > Dec 18 17:49:57 at java.net.ServerSocket.bind(ServerSocket.java:390) > Dec 18 17:49:57 at java.net.ServerSocket.(ServerSocket.java:252) > Dec 18 17:49:57 at > org.apache.flink.streaming.api.operators.collect.CollectSinkFunction$ServerThread.(CollectSinkFunction.java:375) > Dec 18 17:49:57 at > org.apache.flink.streaming.api.operators.collect.CollectSinkFunction$ServerThread.(CollectSinkFunction.java:362) > Dec 18 17:49:57 at > org.apache.flink.streaming.api.operators.collect.CollectSinkFunction.open(CollectSinkFunction.java:252) > Dec 18 17:49:57 at > org.apache.flink.streaming.api.operators.collect.utils.CollectSinkFunctionTestWrapper.openFunction(CollectSinkFunctionTestWrapper.java:103) > Dec 18 17:49:57 at > org.apache.flink.streaming.api.operators.collect.CollectSinkFunctionTest.testConfiguredPortIsUsed(CollectSinkFunctionTest.java:138) > Dec 18 17:49:57 at java.lang.reflect.Method.invoke(Method.java:498) > Dec 18 17:49:57 at > java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) > Dec 18 17:49:57 at > java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > Dec 18 17:49:57 at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > Dec 18 17:49:57 at > java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > Dec 18 17:49:57 at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-33861] Implement restore tests for WindowRank node [flink]
bvarghese1 opened a new pull request, #23956: URL: https://github.com/apache/flink/pull/23956 ## What is the purpose of the change *Add restore tests for WindowRank node* ## Verifying this change This change added tests and can be verified as follows: - Added restore tests for WindowRank node which verifies the generated compiled plan with the saved compiled plan ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33861) Implement restore tests for WindowRank node
[ https://issues.apache.org/jira/browse/FLINK-33861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33861: --- Labels: pull-request-available (was: ) > Implement restore tests for WindowRank node > --- > > Key: FLINK-33861 > URL: https://issues.apache.org/jira/browse/FLINK-33861 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Bonnie Varghese >Assignee: Bonnie Varghese >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33877) CollectSinkFunctionTest.testConfiguredPortIsUsed fails due to BindException
[ https://issues.apache.org/jira/browse/FLINK-33877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33877: --- Labels: pull-request-available test-stability (was: test-stability) > CollectSinkFunctionTest.testConfiguredPortIsUsed fails due to BindException > --- > > Key: FLINK-33877 > URL: https://issues.apache.org/jira/browse/FLINK-33877 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.19.0 >Reporter: Jiabao Sun >Priority: Critical > Labels: pull-request-available, test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=55646=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9482 > {noformat} > Dec 18 17:49:57 17:49:57.241 [ERROR] > org.apache.flink.streaming.api.operators.collect.CollectSinkFunctionTest.testConfiguredPortIsUsed > -- Time elapsed: 0.021 s <<< ERROR! > Dec 18 17:49:57 java.net.BindException: Address already in use (Bind failed) > Dec 18 17:49:57 at java.net.PlainSocketImpl.socketBind(Native Method) > Dec 18 17:49:57 at > java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:387) > Dec 18 17:49:57 at java.net.ServerSocket.bind(ServerSocket.java:390) > Dec 18 17:49:57 at java.net.ServerSocket.(ServerSocket.java:252) > Dec 18 17:49:57 at > org.apache.flink.streaming.api.operators.collect.CollectSinkFunction$ServerThread.(CollectSinkFunction.java:375) > Dec 18 17:49:57 at > org.apache.flink.streaming.api.operators.collect.CollectSinkFunction$ServerThread.(CollectSinkFunction.java:362) > Dec 18 17:49:57 at > org.apache.flink.streaming.api.operators.collect.CollectSinkFunction.open(CollectSinkFunction.java:252) > Dec 18 17:49:57 at > org.apache.flink.streaming.api.operators.collect.utils.CollectSinkFunctionTestWrapper.openFunction(CollectSinkFunctionTestWrapper.java:103) > Dec 18 17:49:57 at > org.apache.flink.streaming.api.operators.collect.CollectSinkFunctionTest.testConfiguredPortIsUsed(CollectSinkFunctionTest.java:138) > Dec 18 17:49:57 at java.lang.reflect.Method.invoke(Method.java:498) > Dec 18 17:49:57 at > java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) > Dec 18 17:49:57 at > java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > Dec 18 17:49:57 at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > Dec 18 17:49:57 at > java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > Dec 18 17:49:57 at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-33877][streaming] Fix unstable test of CollectSinkFunctionTest.testConfiguredPortIsUsed [flink]
Jiabao-Sun opened a new pull request, #23955: URL: https://github.com/apache/flink/pull/23955 ## What is the purpose of the change [FLINK-33877][streaming] Fix unstable test of CollectSinkFunctionTest.testConfiguredPortIsUsed https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=55646=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9482 ``` Dec 18 17:49:57 17:49:57.241 [ERROR] org.apache.flink.streaming.api.operators.collect.CollectSinkFunctionTest.testConfiguredPortIsUsed -- Time elapsed: 0.021 s <<< ERROR! Dec 18 17:49:57 java.net.BindException: Address already in use (Bind failed) Dec 18 17:49:57 at java.net.PlainSocketImpl.socketBind(Native Method) Dec 18 17:49:57 at java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:387) Dec 18 17:49:57 at java.net.ServerSocket.bind(ServerSocket.java:390) Dec 18 17:49:57 at java.net.ServerSocket.(ServerSocket.java:252) Dec 18 17:49:57 at org.apache.flink.streaming.api.operators.collect.CollectSinkFunction$ServerThread.(CollectSinkFunction.java:375) Dec 18 17:49:57 at org.apache.flink.streaming.api.operators.collect.CollectSinkFunction$ServerThread.(CollectSinkFunction.java:362) Dec 18 17:49:57 at org.apache.flink.streaming.api.operators.collect.CollectSinkFunction.open(CollectSinkFunction.java:252) Dec 18 17:49:57 at org.apache.flink.streaming.api.operators.collect.utils.CollectSinkFunctionTestWrapper.openFunction(CollectSinkFunctionTestWrapper.java:103) Dec 18 17:49:57 at org.apache.flink.streaming.api.operators.collect.CollectSinkFunctionTest.testConfiguredPortIsUsed(CollectSinkFunctionTest.java:138) Dec 18 17:49:57 at java.lang.reflect.Method.invoke(Method.java:498) Dec 18 17:49:57 at java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) Dec 18 17:49:57 at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) Dec 18 17:49:57 at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) Dec 18 17:49:57 at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) Dec 18 17:49:57 at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) ``` ## Brief change log Based on the exception stack trace, it is possible that port 50500 is being used by other test cases. To make this test more stable, we may need to use a random port. ## Verifying this change This change is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33795] Add a new config to forbid autoscale execution in the configured excluded periods [flink-kubernetes-operator]
flashJd commented on code in PR #728: URL: https://github.com/apache/flink-kubernetes-operator/pull/728#discussion_r1430882288 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java: ## @@ -143,7 +145,11 @@ public CollectedMetricHistory updateMetrics( if (now.isBefore(windowFullTime)) { LOG.info("Metric window not full until {}", windowFullTime); } else { -collectedMetrics.setFullyCollected(true); +if (isExcluded) { +LOG.info("autoscaling now in excluded period"); Review Comment: Agreed. Adjustment has been made -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33502][runtime] The preparation step of ScheduledSubpartitionReader should not shut down the process [flink]
flinkbot commented on PR #23954: URL: https://github.com/apache/flink/pull/23954#issuecomment-1862054502 ## CI report: * ba611f6691b0383e2861828d2e3d968d1d4aa74d UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33502) HybridShuffleITCase caused a fatal error
[ https://issues.apache.org/jira/browse/FLINK-33502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798413#comment-17798413 ] Wencong Liu commented on FLINK-33502: - Sorry for the late reply. I've just identified the issue and proposed a fix; it should be stable now. [~mapohl] > HybridShuffleITCase caused a fatal error > > > Key: FLINK-33502 > URL: https://issues.apache.org/jira/browse/FLINK-33502 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.19.0 >Reporter: Matthias Pohl >Assignee: Wencong Liu >Priority: Major > Labels: pull-request-available, test-stability > Fix For: 1.19.0 > > Attachments: image-2023-11-20-14-37-37-321.png > > > [https://github.com/XComp/flink/actions/runs/6789774296/job/18458197040#step:12:9177] > {code:java} > Error: 21:21:35 21:21:35.379 [ERROR] Error occurred in starting fork, check > output in log > 9168Error: 21:21:35 21:21:35.379 [ERROR] Process Exit Code: 239 > 9169Error: 21:21:35 21:21:35.379 [ERROR] Crashed tests: > 9170Error: 21:21:35 21:21:35.379 [ERROR] > org.apache.flink.test.runtime.HybridShuffleITCase > 9171Error: 21:21:35 21:21:35.379 [ERROR] > org.apache.maven.surefire.booter.SurefireBooterForkException: > ExecutionException The forked VM terminated without properly saying goodbye. > VM crash or System.exit called? > 9172Error: 21:21:35 21:21:35.379 [ERROR] Command was /bin/sh -c cd > /root/flink/flink-tests && /usr/lib/jvm/jdk-11.0.19+7/bin/java -XX:+UseG1GC > -Xms256m -XX:+IgnoreUnrecognizedVMOptions > --add-opens=java.base/java.util=ALL-UNNAMED > --add-opens=java.base/java.io=ALL-UNNAMED -Xmx1536m -jar > /root/flink/flink-tests/target/surefire/surefirebooter10811559899200556131.jar > /root/flink/flink-tests/target/surefire 2023-11-07T20-32-50_466-jvmRun4 > surefire6242806641230738408tmp surefire_1603959900047297795160tmp > 9173Error: 21:21:35 21:21:35.379 [ERROR] Error occurred in starting fork, > check output in log > 9174Error: 21:21:35 21:21:35.379 [ERROR] Process Exit Code: 239 > 9175Error: 21:21:35 21:21:35.379 [ERROR] Crashed tests: > 9176Error: 21:21:35 21:21:35.379 [ERROR] > org.apache.flink.test.runtime.HybridShuffleITCase > 9177Error: 21:21:35 21:21:35.379 [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.awaitResultsDone(ForkStarter.java:532) > 9178Error: 21:21:35 21:21:35.379 [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.runSuitesForkPerTestSet(ForkStarter.java:479) > 9179Error: 21:21:35 21:21:35.379 [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:322) > 9180Error: 21:21:35 21:21:35.379 [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:266) > [...] {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-33502][runtime] The preparation step of ScheduledSubpartitionReader should not shut down the process [flink]
WencongLiu opened a new pull request, #23954: URL: https://github.com/apache/flink/pull/23954 ## What is the purpose of the change *The preparation step of ScheduledSubpartitionReader should not shut down the process.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33877) CollectSinkFunctionTest.testConfiguredPortIsUsed fails due to BindException
[ https://issues.apache.org/jira/browse/FLINK-33877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiabao Sun updated FLINK-33877: --- Labels: test-stability (was: ) > CollectSinkFunctionTest.testConfiguredPortIsUsed fails due to BindException > --- > > Key: FLINK-33877 > URL: https://issues.apache.org/jira/browse/FLINK-33877 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.19.0 >Reporter: Jiabao Sun >Priority: Critical > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=55646=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9482 > {noformat} > Dec 18 17:49:57 17:49:57.241 [ERROR] > org.apache.flink.streaming.api.operators.collect.CollectSinkFunctionTest.testConfiguredPortIsUsed > -- Time elapsed: 0.021 s <<< ERROR! > Dec 18 17:49:57 java.net.BindException: Address already in use (Bind failed) > Dec 18 17:49:57 at java.net.PlainSocketImpl.socketBind(Native Method) > Dec 18 17:49:57 at > java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:387) > Dec 18 17:49:57 at java.net.ServerSocket.bind(ServerSocket.java:390) > Dec 18 17:49:57 at java.net.ServerSocket.(ServerSocket.java:252) > Dec 18 17:49:57 at > org.apache.flink.streaming.api.operators.collect.CollectSinkFunction$ServerThread.(CollectSinkFunction.java:375) > Dec 18 17:49:57 at > org.apache.flink.streaming.api.operators.collect.CollectSinkFunction$ServerThread.(CollectSinkFunction.java:362) > Dec 18 17:49:57 at > org.apache.flink.streaming.api.operators.collect.CollectSinkFunction.open(CollectSinkFunction.java:252) > Dec 18 17:49:57 at > org.apache.flink.streaming.api.operators.collect.utils.CollectSinkFunctionTestWrapper.openFunction(CollectSinkFunctionTestWrapper.java:103) > Dec 18 17:49:57 at > org.apache.flink.streaming.api.operators.collect.CollectSinkFunctionTest.testConfiguredPortIsUsed(CollectSinkFunctionTest.java:138) > Dec 18 17:49:57 at java.lang.reflect.Method.invoke(Method.java:498) > Dec 18 17:49:57 at > java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) > Dec 18 17:49:57 at > java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > Dec 18 17:49:57 at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > Dec 18 17:49:57 at > java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > Dec 18 17:49:57 at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-31650][metrics][rest] Remove transient metrics for subtasks in terminal state [flink]
X-czh commented on code in PR #23447: URL: https://github.com/apache/flink/pull/23447#discussion_r1430859019 ## flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java: ## @@ -570,6 +578,19 @@ void retainSubtasks(Set activeSubtasks) { subtasks.keySet().retainAll(activeSubtasks); } +void removeTransientMetrics(int subtaskIndex) { +if (subtasks.containsKey(subtaskIndex)) { +// Remove in both places as task metrics are duplicated in task metric store and +// subtask metric store for metrics query on WebInterface. +metrics.keySet() Review Comment: Addressed, thx -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31650][metrics][rest] Remove transient metrics for subtasks in terminal state [flink]
JunRuiLee commented on code in PR #23447: URL: https://github.com/apache/flink/pull/23447#discussion_r1430850407 ## flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java: ## @@ -570,6 +578,19 @@ void retainSubtasks(Set activeSubtasks) { subtasks.keySet().retainAll(activeSubtasks); } +void removeTransientMetrics(int subtaskIndex) { +if (subtasks.containsKey(subtaskIndex)) { +// Remove in both places as task metrics are duplicated in task metric store and +// subtask metric store for metrics query on WebInterface. +metrics.keySet() Review Comment: I prefer not specifically mentioning 'query on WebInterface' since the metric store also exposes metrics to users through the REST API. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33689][table-runtime] Fix JsonObjectAggFunction can't retract records when enabling LocalGlobal. [flink]
lincoln-lil commented on code in PR #23827: URL: https://github.com/apache/flink/pull/23827#discussion_r1430848529 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala: ## @@ -1916,4 +1916,44 @@ class AggregateITCase(aggMode: AggMode, miniBatch: MiniBatchMode, backend: State ) assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted) } + + @TestTemplate + def testGroupJsonObjectAggWithRetract(): Unit = { +val data = new mutable.MutableList[(Long, String, Long)] +data.+=((2L, "Hallo", 2L)) Review Comment: This can be simplified, e.g., use a for loop -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33534][runtime] Support configuring PARALLELISM_OVERRIDES during job submission [flink]
flinkbot commented on PR #23953: URL: https://github.com/apache/flink/pull/23953#issuecomment-1861985105 ## CI report: * 0165b2d94c1464a19c1b74f77e2fbaa1e5e236eb UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33878) Many Keyed Operators extends `TableStreamOperator` which is marked without key.
[ https://issues.apache.org/jira/browse/FLINK-33878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xuyang updated FLINK-33878: --- Summary: Many Keyed Operators extends `TableStreamOperator` which is marked without key. (was: Many Keyed Operator extends `TableStreamOperator` which is marked without key. ) > Many Keyed Operators extends `TableStreamOperator` which is marked without > key. > > > Key: FLINK-33878 > URL: https://issues.apache.org/jira/browse/FLINK-33878 > Project: Flink > Issue Type: Technical Debt > Components: Table SQL / Runtime >Reporter: xuyang >Priority: Minor > > Many Keyed Operator like `WindowJoinOperator` and `SlicingWindowOperator` > extends `TableStreamOperator` which is marked without key. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33878) Many Keyed Operator extends `TableStreamOperator` which is marked without key.
[ https://issues.apache.org/jira/browse/FLINK-33878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798406#comment-17798406 ] xuyang commented on FLINK-33878: Do you think we should fix it? If yes, I want to take this Jira and fix it later. > Many Keyed Operator extends `TableStreamOperator` which is marked without > key. > --- > > Key: FLINK-33878 > URL: https://issues.apache.org/jira/browse/FLINK-33878 > Project: Flink > Issue Type: Technical Debt > Components: Table SQL / Runtime >Reporter: xuyang >Priority: Minor > > Many Keyed Operator like `WindowJoinOperator` and `SlicingWindowOperator` > extends `TableStreamOperator` which is marked without key. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33878) Many Keyed Operator extends `TableStreamOperator` which is marked without key.
xuyang created FLINK-33878: -- Summary: Many Keyed Operator extends `TableStreamOperator` which is marked without key. Key: FLINK-33878 URL: https://issues.apache.org/jira/browse/FLINK-33878 Project: Flink Issue Type: Technical Debt Components: Table SQL / Runtime Reporter: xuyang Many Keyed Operator like `WindowJoinOperator` and `SlicingWindowOperator` extends `TableStreamOperator` which is marked without key. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33878) Many Keyed Operator extends `TableStreamOperator` which is marked without key.
[ https://issues.apache.org/jira/browse/FLINK-33878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798405#comment-17798405 ] xuyang commented on FLINK-33878: cc [~qingyue] > Many Keyed Operator extends `TableStreamOperator` which is marked without > key. > --- > > Key: FLINK-33878 > URL: https://issues.apache.org/jira/browse/FLINK-33878 > Project: Flink > Issue Type: Technical Debt > Components: Table SQL / Runtime >Reporter: xuyang >Priority: Minor > > Many Keyed Operator like `WindowJoinOperator` and `SlicingWindowOperator` > extends `TableStreamOperator` which is marked without key. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33534][runtime] Support configuring PARALLELISM_OVERRIDES during job submission [flink]
yunfengzhou-hub commented on PR #23953: URL: https://github.com/apache/flink/pull/23953#issuecomment-1861973471 Hi @reswqa could you please take a look at this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-33534][runtime] Support configuring PARALLELISM_OVERRIDES during job submission [flink]
yunfengzhou-hub opened a new pull request, #23953: URL: https://github.com/apache/flink/pull/23953 ## What is the purpose of the change This pull request supports configuring the PARALLELISM_OVERRIDES configuration during the job submission process. Before this PR, Flink has currently only allowed configuring this parameter before the Dispatcher, or the cluster, starts. ## Brief change log - Pass PARALLELISM_OVERRIDES from environment to job graph. - Make dispatcher use PARALLELISM_OVERRIDES from job graph before submitting job. ## Verifying this change This change is covered by modified and newly added tests in JarRunHandlerParameterTest and DispatcherTest. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-33534) PipelineOptions.PARALLELISM_OVERRIDES is not picked up from jar submission request
[ https://issues.apache.org/jira/browse/FLINK-33534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo closed FLINK-33534. -- Fix Version/s: 1.19.0 Resolution: Fixed master(1.19) via 4cc24c1dd17b0abe3c4372652c7ab88fedc7e478. > PipelineOptions.PARALLELISM_OVERRIDES is not picked up from jar submission > request > -- > > Key: FLINK-33534 > URL: https://issues.apache.org/jira/browse/FLINK-33534 > Project: Flink > Issue Type: Bug > Components: Runtime / Configuration, Runtime / REST >Affects Versions: 1.18.0, 1.17.1 >Reporter: Gyula Fora >Assignee: Yunfeng Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > PARALLELISM_OVERRIDES are currently only applied when they are part of the > JobManager / Cluster configuration. > When this config is provided as part of the JarRunRequestBody it is > completely ignored and does not take effect. > The main reason is that the dispatcher reads this value from it's own > configuration object and does not include the extra configs passed through > the rest request. > This is a blocker for supporting the autoscaler properly for FlinkSessionJobs > in the autoscaler -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33534][runtime] Support configuring PARALLELISM_OVERRIDES during job submission [flink]
reswqa merged PR #23944: URL: https://github.com/apache/flink/pull/23944 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33876][table-planner][JUnit5 Migration] Introduce methodName method in TableTestBase [flink]
Jiabao-Sun commented on PR #23952: URL: https://github.com/apache/flink/pull/23952#issuecomment-1861943681 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-33877) CollectSinkFunctionTest.testConfiguredPortIsUsed fails due to BindException
Jiabao Sun created FLINK-33877: -- Summary: CollectSinkFunctionTest.testConfiguredPortIsUsed fails due to BindException Key: FLINK-33877 URL: https://issues.apache.org/jira/browse/FLINK-33877 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.19.0 Reporter: Jiabao Sun https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=55646=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9482 {noformat} Dec 18 17:49:57 17:49:57.241 [ERROR] org.apache.flink.streaming.api.operators.collect.CollectSinkFunctionTest.testConfiguredPortIsUsed -- Time elapsed: 0.021 s <<< ERROR! Dec 18 17:49:57 java.net.BindException: Address already in use (Bind failed) Dec 18 17:49:57 at java.net.PlainSocketImpl.socketBind(Native Method) Dec 18 17:49:57 at java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:387) Dec 18 17:49:57 at java.net.ServerSocket.bind(ServerSocket.java:390) Dec 18 17:49:57 at java.net.ServerSocket.(ServerSocket.java:252) Dec 18 17:49:57 at org.apache.flink.streaming.api.operators.collect.CollectSinkFunction$ServerThread.(CollectSinkFunction.java:375) Dec 18 17:49:57 at org.apache.flink.streaming.api.operators.collect.CollectSinkFunction$ServerThread.(CollectSinkFunction.java:362) Dec 18 17:49:57 at org.apache.flink.streaming.api.operators.collect.CollectSinkFunction.open(CollectSinkFunction.java:252) Dec 18 17:49:57 at org.apache.flink.streaming.api.operators.collect.utils.CollectSinkFunctionTestWrapper.openFunction(CollectSinkFunctionTestWrapper.java:103) Dec 18 17:49:57 at org.apache.flink.streaming.api.operators.collect.CollectSinkFunctionTest.testConfiguredPortIsUsed(CollectSinkFunctionTest.java:138) Dec 18 17:49:57 at java.lang.reflect.Method.invoke(Method.java:498) Dec 18 17:49:57 at java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) Dec 18 17:49:57 at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) Dec 18 17:49:57 at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) Dec 18 17:49:57 at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) Dec 18 17:49:57 at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33877) CollectSinkFunctionTest.testConfiguredPortIsUsed fails due to BindException
[ https://issues.apache.org/jira/browse/FLINK-33877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiabao Sun updated FLINK-33877: --- Priority: Critical (was: Major) > CollectSinkFunctionTest.testConfiguredPortIsUsed fails due to BindException > --- > > Key: FLINK-33877 > URL: https://issues.apache.org/jira/browse/FLINK-33877 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.19.0 >Reporter: Jiabao Sun >Priority: Critical > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=55646=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9482 > {noformat} > Dec 18 17:49:57 17:49:57.241 [ERROR] > org.apache.flink.streaming.api.operators.collect.CollectSinkFunctionTest.testConfiguredPortIsUsed > -- Time elapsed: 0.021 s <<< ERROR! > Dec 18 17:49:57 java.net.BindException: Address already in use (Bind failed) > Dec 18 17:49:57 at java.net.PlainSocketImpl.socketBind(Native Method) > Dec 18 17:49:57 at > java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:387) > Dec 18 17:49:57 at java.net.ServerSocket.bind(ServerSocket.java:390) > Dec 18 17:49:57 at java.net.ServerSocket.(ServerSocket.java:252) > Dec 18 17:49:57 at > org.apache.flink.streaming.api.operators.collect.CollectSinkFunction$ServerThread.(CollectSinkFunction.java:375) > Dec 18 17:49:57 at > org.apache.flink.streaming.api.operators.collect.CollectSinkFunction$ServerThread.(CollectSinkFunction.java:362) > Dec 18 17:49:57 at > org.apache.flink.streaming.api.operators.collect.CollectSinkFunction.open(CollectSinkFunction.java:252) > Dec 18 17:49:57 at > org.apache.flink.streaming.api.operators.collect.utils.CollectSinkFunctionTestWrapper.openFunction(CollectSinkFunctionTestWrapper.java:103) > Dec 18 17:49:57 at > org.apache.flink.streaming.api.operators.collect.CollectSinkFunctionTest.testConfiguredPortIsUsed(CollectSinkFunctionTest.java:138) > Dec 18 17:49:57 at java.lang.reflect.Method.invoke(Method.java:498) > Dec 18 17:49:57 at > java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) > Dec 18 17:49:57 at > java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > Dec 18 17:49:57 at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > Dec 18 17:49:57 at > java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > Dec 18 17:49:57 at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33531) Nightly Python fails with NPE at metadataHandlerProvider on AZP (StreamDependencyTests.test_add_python_archive)
[ https://issues.apache.org/jira/browse/FLINK-33531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798370#comment-17798370 ] Sergey Nuyanzin commented on FLINK-33531: - Merged to 1.18.1 as [0d6ab1db6c04cd88f646d545075bea539bac9fcf|https://github.com/apache/flink/commit/0d6ab1db6c04cd88f646d545075bea539bac9fcf] > Nightly Python fails with NPE at metadataHandlerProvider on AZP > (StreamDependencyTests.test_add_python_archive) > --- > > Key: FLINK-33531 > URL: https://issues.apache.org/jira/browse/FLINK-33531 > Project: Flink > Issue Type: Technical Debt > Components: API / Python >Affects Versions: 1.18.0, 1.19.0 >Reporter: Sergey Nuyanzin >Assignee: Xingbo Huang >Priority: Blocker > Labels: pull-request-available, test-stability > Fix For: 1.19.0, 1.18.1 > > > It seems starting 02.11.2023 every master nightly fails with this (that's why > it is a blocker) > for instance > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=54512=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901] > {noformat} > 2023-11-12T02:10:24.5082784Z Nov 12 02:10:24 if is_error(answer)[0]: > 2023-11-12T02:10:24.5083620Z Nov 12 02:10:24 if len(answer) > 1: > 2023-11-12T02:10:24.5084326Z Nov 12 02:10:24 type = answer[1] > 2023-11-12T02:10:24.5085164Z Nov 12 02:10:24 value = > OUTPUT_CONVERTER[type](answer[2:], gateway_client) > 2023-11-12T02:10:24.5086061Z Nov 12 02:10:24 if answer[1] == > REFERENCE_TYPE: > 2023-11-12T02:10:24.5086850Z Nov 12 02:10:24 > raise > Py4JJavaError( > 2023-11-12T02:10:24.5087677Z Nov 12 02:10:24 "An > error occurred while calling {0}{1}{2}.\n". > 2023-11-12T02:10:24.5088538Z Nov 12 02:10:24 > format(target_id, ".", name), value) > 2023-11-12T02:10:24.5089551Z Nov 12 02:10:24 E > py4j.protocol.Py4JJavaError: An error occurred while calling > o3371.executeInsert. > 2023-11-12T02:10:24.5090832Z Nov 12 02:10:24 E : > java.lang.NullPointerException: metadataHandlerProvider > 2023-11-12T02:10:24.5091832Z Nov 12 02:10:24 Eat > java.util.Objects.requireNonNull(Objects.java:228) > 2023-11-12T02:10:24.5093399Z Nov 12 02:10:24 Eat > org.apache.calcite.rel.metadata.RelMetadataQueryBase.getMetadataHandlerProvider(RelMetadataQueryBase.java:122) > 2023-11-12T02:10:24.5094480Z Nov 12 02:10:24 Eat > org.apache.calcite.rel.metadata.RelMetadataQueryBase.revise(RelMetadataQueryBase.java:118) > 2023-11-12T02:10:24.5095365Z Nov 12 02:10:24 Eat > org.apache.calcite.rel.metadata.RelMetadataQuery.getPulledUpPredicates(RelMetadataQuery.java:844) > 2023-11-12T02:10:24.5096306Z Nov 12 02:10:24 Eat > org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:307) > 2023-11-12T02:10:24.5097238Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:337) > 2023-11-12T02:10:24.5098014Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556) > 2023-11-12T02:10:24.5098753Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:420) > 2023-11-12T02:10:24.5099517Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepPlanner.executeRuleInstance(HepPlanner.java:243) > 2023-11-12T02:10:24.5100373Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepInstruction$RuleInstance$State.execute(HepInstruction.java:178) > 2023-11-12T02:10:24.5101313Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepPlanner.lambda$executeProgram$0(HepPlanner.java:211) > 2023-11-12T02:10:24.5102410Z Nov 12 02:10:24 Eat > org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableList.forEach(ImmutableList.java:422) > 2023-11-12T02:10:24.5103343Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:210) > 2023-11-12T02:10:24.5104105Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepProgram$State.execute(HepProgram.java:118) > 2023-11-12T02:10:24.5104868Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:205) > 2023-11-12T02:10:24.5105616Z Nov 12 02:10:24 Eat >
Re: [PR] [BP-1.18][FLINK-33531][python] Remove cython upper bounds [flink]
snuyanzin merged PR #23950: URL: https://github.com/apache/flink/pull/23950 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-27756) Fix intermittently failing test in AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds
[ https://issues.apache.org/jira/browse/FLINK-27756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798369#comment-17798369 ] Jim Hughes commented on FLINK-27756: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=55640=logs=2e8cb2f7-b2d3-5c62-9c05-cd756d33a819=2dd510a3-5041-5201-6dc3-54d310f68906 > Fix intermittently failing test in > AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds > -- > > Key: FLINK-27756 > URL: https://issues.apache.org/jira/browse/FLINK-27756 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.15.0, 1.17.0, 1.19.0 >Reporter: Ahmed Hamdy >Assignee: Ahmed Hamdy >Priority: Critical > Labels: pull-request-available, stale-assigned, test-stability > Fix For: 1.16.0 > > > h2. Motivation > - One of the integration tests ({{checkLoggedSendTimesAreWithinBounds}}) of > {{AsyncSinkWriterTest}} has been reported to fail intermittently on build > pipeline causing blocking of new changes. > - Reporting build is [linked > |https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=36009=logs=aa18c3f6-13b8-5f58-86bb-c1cffb239496=502fb6c0-30a2-5e49-c5c2-a00fa3acb203] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27756) Fix intermittently failing test in AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds
[ https://issues.apache.org/jira/browse/FLINK-27756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798368#comment-17798368 ] Jim Hughes commented on FLINK-27756: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=55636=logs=2e8cb2f7-b2d3-5c62-9c05-cd756d33a819=2dd510a3-5041-5201-6dc3-54d310f68906 > Fix intermittently failing test in > AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds > -- > > Key: FLINK-27756 > URL: https://issues.apache.org/jira/browse/FLINK-27756 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.15.0, 1.17.0, 1.19.0 >Reporter: Ahmed Hamdy >Assignee: Ahmed Hamdy >Priority: Critical > Labels: pull-request-available, stale-assigned, test-stability > Fix For: 1.16.0 > > > h2. Motivation > - One of the integration tests ({{checkLoggedSendTimesAreWithinBounds}}) of > {{AsyncSinkWriterTest}} has been reported to fail intermittently on build > pipeline causing blocking of new changes. > - Reporting build is [linked > |https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=36009=logs=aa18c3f6-13b8-5f58-86bb-c1cffb239496=502fb6c0-30a2-5e49-c5c2-a00fa3acb203] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [BP-1.18][FLINK-33531][python] Remove cython upper bounds [flink]
snuyanzin commented on PR #23950: URL: https://github.com/apache/flink/pull/23950#issuecomment-1861775006 thanks for taking a look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Fix a few doc mistakes [flink-kubernetes-operator]
ryanvanhuuksloot opened a new pull request, #736: URL: https://github.com/apache/flink-kubernetes-operator/pull/736 ## What is the purpose of the change Fix a few things in the docs ## Brief change log Fix a few things in the docs ## Verifying this change No verification - simple doc changes. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changes to the `CustomResourceDescriptors`: no - Core observer or reconciler logic that is regularly executed: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? docs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fix a few doc mistakes [flink-kubernetes-operator]
ryanvanhuuksloot commented on PR #736: URL: https://github.com/apache/flink-kubernetes-operator/pull/736#issuecomment-1861544730 Someone with merge privileges please merge after reviewing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33872] Retrieve checkpoint history for jobs in completed state [flink]
dannycranmer commented on code in PR #23949: URL: https://github.com/apache/flink/pull/23949#discussion_r1430540231 ## flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java: ## @@ -683,6 +685,129 @@ public void testRetrieveCheckpointStats() throws Exception { Assertions.assertThat(resultsFuture).isCompletedWithValue(snapshot); } +@Test +public void testRetrieveCheckpointStatsOnFailedJob() throws Exception { +CheckpointStatsSnapshot snapshot = getTestCheckpointStatsSnapshotWithTwoFailedCheckpoints(); +TestingJobMasterGateway testingJobMasterGateway = +new TestingJobMasterGatewayBuilder().build(); + +dispatcher = +createAndStartDispatcher( +heartbeatServices, +haServices, +new TestingJobMasterGatewayJobManagerRunnerFactory( +testingJobMasterGateway)); + +final JobStatus jobStatus = JobStatus.FAILED; +final ExecutionGraphInfo completedExecutionGraphInfo = +new ExecutionGraphInfo( +new ArchivedExecutionGraphBuilder() +.setJobID(jobId) +.setState(jobStatus) +.setFailureCause( +new ErrorInfo(new RuntimeException("expected"), 1L)) +.setCheckpointStatsSnapshot(snapshot) +.build()); +dispatcher.completeJobExecution(completedExecutionGraphInfo); + +CompletableFuture resultsFuture = +dispatcher.callAsyncInMainThread( +() -> dispatcher.requestCheckpointStats(jobId, TIMEOUT)); + Assertions.assertThat(resultsFuture).succeedsWithin(Duration.ofSeconds(1)); +Assertions.assertThat(resultsFuture).isCompletedWithValue(snapshot); +} Review Comment: CI Failure: > Dec 18 16:52:43 16:52:43.222 [ERROR] DispatcherTest.testRetrieveCheckpointStatsOnNonExistentJob:804 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33872] Retrieve checkpoint history for jobs in completed state [flink]
dannycranmer commented on PR #23949: URL: https://github.com/apache/flink/pull/23949#issuecomment-1861348278 CI Failure: > Dec 18 16:52:43 16:52:43.222 [ERROR] DispatcherTest.testRetrieveCheckpointStatsOnNonExistentJob:804 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33872] Retrieve checkpoint history for jobs in completed state [flink]
dannycranmer commented on code in PR #23949: URL: https://github.com/apache/flink/pull/23949#discussion_r1430540231 ## flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java: ## @@ -683,6 +685,129 @@ public void testRetrieveCheckpointStats() throws Exception { Assertions.assertThat(resultsFuture).isCompletedWithValue(snapshot); } +@Test +public void testRetrieveCheckpointStatsOnFailedJob() throws Exception { +CheckpointStatsSnapshot snapshot = getTestCheckpointStatsSnapshotWithTwoFailedCheckpoints(); +TestingJobMasterGateway testingJobMasterGateway = +new TestingJobMasterGatewayBuilder().build(); + +dispatcher = +createAndStartDispatcher( +heartbeatServices, +haServices, +new TestingJobMasterGatewayJobManagerRunnerFactory( +testingJobMasterGateway)); + +final JobStatus jobStatus = JobStatus.FAILED; +final ExecutionGraphInfo completedExecutionGraphInfo = +new ExecutionGraphInfo( +new ArchivedExecutionGraphBuilder() +.setJobID(jobId) +.setState(jobStatus) +.setFailureCause( +new ErrorInfo(new RuntimeException("expected"), 1L)) +.setCheckpointStatsSnapshot(snapshot) +.build()); +dispatcher.completeJobExecution(completedExecutionGraphInfo); + +CompletableFuture resultsFuture = +dispatcher.callAsyncInMainThread( +() -> dispatcher.requestCheckpointStats(jobId, TIMEOUT)); + Assertions.assertThat(resultsFuture).succeedsWithin(Duration.ofSeconds(1)); +Assertions.assertThat(resultsFuture).isCompletedWithValue(snapshot); +} Review Comment: CI Failure: > Dec 18 16:52:43 16:52:43.222 [ERROR] DispatcherTest.testRetrieveCheckpointStatsOnNonExistentJob:804 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33872] Retrieve checkpoint history for jobs in completed state [flink]
dannycranmer commented on code in PR #23949: URL: https://github.com/apache/flink/pull/23949#discussion_r1430538449 ## flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java: ## @@ -683,6 +685,129 @@ public void testRetrieveCheckpointStats() throws Exception { Assertions.assertThat(resultsFuture).isCompletedWithValue(snapshot); } +@Test +public void testRetrieveCheckpointStatsOnFailedJob() throws Exception { +CheckpointStatsSnapshot snapshot = getTestCheckpointStatsSnapshotWithTwoFailedCheckpoints(); +TestingJobMasterGateway testingJobMasterGateway = +new TestingJobMasterGatewayBuilder().build(); + +dispatcher = +createAndStartDispatcher( +heartbeatServices, +haServices, +new TestingJobMasterGatewayJobManagerRunnerFactory( +testingJobMasterGateway)); + +final JobStatus jobStatus = JobStatus.FAILED; +final ExecutionGraphInfo completedExecutionGraphInfo = +new ExecutionGraphInfo( +new ArchivedExecutionGraphBuilder() +.setJobID(jobId) +.setState(jobStatus) +.setFailureCause( +new ErrorInfo(new RuntimeException("expected"), 1L)) +.setCheckpointStatsSnapshot(snapshot) +.build()); +dispatcher.completeJobExecution(completedExecutionGraphInfo); + +CompletableFuture resultsFuture = +dispatcher.callAsyncInMainThread( +() -> dispatcher.requestCheckpointStats(jobId, TIMEOUT)); + Assertions.assertThat(resultsFuture).succeedsWithin(Duration.ofSeconds(1)); +Assertions.assertThat(resultsFuture).isCompletedWithValue(snapshot); +} Review Comment: Can you extract to a method to remove duplication? Can pass in the `JobStatus` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30535] Introduce TTL state based benchmarks [flink-benchmarks]
Zakelly commented on code in PR #83: URL: https://github.com/apache/flink-benchmarks/pull/83#discussion_r1430468167 ## src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java: ## @@ -0,0 +1,48 @@ +package org.apache.flink.state.benchmark.ttl; + +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.state.benchmark.StateBenchmarkBase; +import org.openjdk.jmh.annotations.Param; + +import java.util.concurrent.TimeUnit; + +/** The base class for state tests with ttl. */ +public class TtlStateBenchmarkBase extends StateBenchmarkBase { + +/** The expired time of ttl. */ +public enum ExpiredTimeOptions { + +/** 5 seconds. */ +Seconds5(5000), Review Comment: I think it is better to manipulate the `TtlTimeProvider` to finely control the number of keys eliminated in each iteration. So I customize the `TtlTimeProvider` and provide an option for the percentage of keys that expired per iteration. This option will affect the result of test especially in `valueGet`. Here's a running test: ``` # Benchmark: org.apache.flink.state.benchmark.ttl.TtlValueStateBenchmark.valueGet # Parameters: (backendType = HEAP, expiredOption = ExpirePercent3PerIteration, stateVisibility = NeverReturnExpired, updateType = OnCreateAndWrite) # Run progress: 0.00% complete, ETA 00:12:00 # Fork: 1 of 3 # Warmup Iteration 1: 2993.246 ops/ms # Warmup Iteration 2: 3520.954 ops/ms # Warmup Iteration 3: 3640.884 ops/ms # Warmup Iteration 4: 3643.386 ops/ms # Warmup Iteration 5: 3776.677 ops/ms # Warmup Iteration 6: 3739.375 ops/ms # Warmup Iteration 7: 3903.434 ops/ms # Warmup Iteration 8: 3877.286 ops/ms # Warmup Iteration 9: 3913.663 ops/ms # Warmup Iteration 10: 4093.471 ops/ms Iteration 1: 4117.018 ops/ms Iteration 2: 4255.166 ops/ms Iteration 3: 4338.564 ops/ms Iteration 4: 4439.571 ops/ms Iteration 5: 4603.235 ops/ms Iteration 6: 4735.039 ops/ms Iteration 7: 4896.946 ops/ms Iteration 8: 5153.644 ops/ms Iteration 9: 5429.196 ops/ms Iteration 10: 5772.806 ops/ms ``` As we can see, as the key expires, the test results increase iteration by iteration. I pick 3% for a predefined value, since there are 20 iterations in each test and we cannot let all the keys expire. WDYT? @Myasuka -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Bump org.apache.commons:commons-compress from 1.23.0 to 1.24.0 [flink-connector-jdbc]
eskabetxe commented on PR #84: URL: https://github.com/apache/flink-connector-jdbc/pull/84#issuecomment-1861086328 @MartijnVisser LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.18][FLINK-33531][python] Remove cython upper bounds [flink]
JingGe commented on PR #23950: URL: https://github.com/apache/flink/pull/23950#issuecomment-1860976058 @snuyanzin thanks for the heads-up. Let's add it into 1.18.1 release -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33531) Nightly Python fails with NPE at metadataHandlerProvider on AZP (StreamDependencyTests.test_add_python_archive)
[ https://issues.apache.org/jira/browse/FLINK-33531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Ge updated FLINK-33531: Affects Version/s: 1.18.0 > Nightly Python fails with NPE at metadataHandlerProvider on AZP > (StreamDependencyTests.test_add_python_archive) > --- > > Key: FLINK-33531 > URL: https://issues.apache.org/jira/browse/FLINK-33531 > Project: Flink > Issue Type: Technical Debt > Components: API / Python >Affects Versions: 1.18.0, 1.19.0 >Reporter: Sergey Nuyanzin >Assignee: Xingbo Huang >Priority: Blocker > Labels: pull-request-available, test-stability > Fix For: 1.19.0, 1.18.1 > > > It seems starting 02.11.2023 every master nightly fails with this (that's why > it is a blocker) > for instance > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=54512=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901] > {noformat} > 2023-11-12T02:10:24.5082784Z Nov 12 02:10:24 if is_error(answer)[0]: > 2023-11-12T02:10:24.5083620Z Nov 12 02:10:24 if len(answer) > 1: > 2023-11-12T02:10:24.5084326Z Nov 12 02:10:24 type = answer[1] > 2023-11-12T02:10:24.5085164Z Nov 12 02:10:24 value = > OUTPUT_CONVERTER[type](answer[2:], gateway_client) > 2023-11-12T02:10:24.5086061Z Nov 12 02:10:24 if answer[1] == > REFERENCE_TYPE: > 2023-11-12T02:10:24.5086850Z Nov 12 02:10:24 > raise > Py4JJavaError( > 2023-11-12T02:10:24.5087677Z Nov 12 02:10:24 "An > error occurred while calling {0}{1}{2}.\n". > 2023-11-12T02:10:24.5088538Z Nov 12 02:10:24 > format(target_id, ".", name), value) > 2023-11-12T02:10:24.5089551Z Nov 12 02:10:24 E > py4j.protocol.Py4JJavaError: An error occurred while calling > o3371.executeInsert. > 2023-11-12T02:10:24.5090832Z Nov 12 02:10:24 E : > java.lang.NullPointerException: metadataHandlerProvider > 2023-11-12T02:10:24.5091832Z Nov 12 02:10:24 Eat > java.util.Objects.requireNonNull(Objects.java:228) > 2023-11-12T02:10:24.5093399Z Nov 12 02:10:24 Eat > org.apache.calcite.rel.metadata.RelMetadataQueryBase.getMetadataHandlerProvider(RelMetadataQueryBase.java:122) > 2023-11-12T02:10:24.5094480Z Nov 12 02:10:24 Eat > org.apache.calcite.rel.metadata.RelMetadataQueryBase.revise(RelMetadataQueryBase.java:118) > 2023-11-12T02:10:24.5095365Z Nov 12 02:10:24 Eat > org.apache.calcite.rel.metadata.RelMetadataQuery.getPulledUpPredicates(RelMetadataQuery.java:844) > 2023-11-12T02:10:24.5096306Z Nov 12 02:10:24 Eat > org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:307) > 2023-11-12T02:10:24.5097238Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:337) > 2023-11-12T02:10:24.5098014Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556) > 2023-11-12T02:10:24.5098753Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:420) > 2023-11-12T02:10:24.5099517Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepPlanner.executeRuleInstance(HepPlanner.java:243) > 2023-11-12T02:10:24.5100373Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepInstruction$RuleInstance$State.execute(HepInstruction.java:178) > 2023-11-12T02:10:24.5101313Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepPlanner.lambda$executeProgram$0(HepPlanner.java:211) > 2023-11-12T02:10:24.5102410Z Nov 12 02:10:24 Eat > org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableList.forEach(ImmutableList.java:422) > 2023-11-12T02:10:24.5103343Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:210) > 2023-11-12T02:10:24.5104105Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepProgram$State.execute(HepProgram.java:118) > 2023-11-12T02:10:24.5104868Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:205) > 2023-11-12T02:10:24.5105616Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:191) > 2023-11-12T02:10:24.5106421Z Nov 12 02:10:24 Eat >
[jira] [Updated] (FLINK-33531) Nightly Python fails with NPE at metadataHandlerProvider on AZP (StreamDependencyTests.test_add_python_archive)
[ https://issues.apache.org/jira/browse/FLINK-33531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Ge updated FLINK-33531: Fix Version/s: 1.18.1 > Nightly Python fails with NPE at metadataHandlerProvider on AZP > (StreamDependencyTests.test_add_python_archive) > --- > > Key: FLINK-33531 > URL: https://issues.apache.org/jira/browse/FLINK-33531 > Project: Flink > Issue Type: Technical Debt > Components: API / Python >Affects Versions: 1.19.0 >Reporter: Sergey Nuyanzin >Assignee: Xingbo Huang >Priority: Blocker > Labels: pull-request-available, test-stability > Fix For: 1.19.0, 1.18.1 > > > It seems starting 02.11.2023 every master nightly fails with this (that's why > it is a blocker) > for instance > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=54512=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901] > {noformat} > 2023-11-12T02:10:24.5082784Z Nov 12 02:10:24 if is_error(answer)[0]: > 2023-11-12T02:10:24.5083620Z Nov 12 02:10:24 if len(answer) > 1: > 2023-11-12T02:10:24.5084326Z Nov 12 02:10:24 type = answer[1] > 2023-11-12T02:10:24.5085164Z Nov 12 02:10:24 value = > OUTPUT_CONVERTER[type](answer[2:], gateway_client) > 2023-11-12T02:10:24.5086061Z Nov 12 02:10:24 if answer[1] == > REFERENCE_TYPE: > 2023-11-12T02:10:24.5086850Z Nov 12 02:10:24 > raise > Py4JJavaError( > 2023-11-12T02:10:24.5087677Z Nov 12 02:10:24 "An > error occurred while calling {0}{1}{2}.\n". > 2023-11-12T02:10:24.5088538Z Nov 12 02:10:24 > format(target_id, ".", name), value) > 2023-11-12T02:10:24.5089551Z Nov 12 02:10:24 E > py4j.protocol.Py4JJavaError: An error occurred while calling > o3371.executeInsert. > 2023-11-12T02:10:24.5090832Z Nov 12 02:10:24 E : > java.lang.NullPointerException: metadataHandlerProvider > 2023-11-12T02:10:24.5091832Z Nov 12 02:10:24 Eat > java.util.Objects.requireNonNull(Objects.java:228) > 2023-11-12T02:10:24.5093399Z Nov 12 02:10:24 Eat > org.apache.calcite.rel.metadata.RelMetadataQueryBase.getMetadataHandlerProvider(RelMetadataQueryBase.java:122) > 2023-11-12T02:10:24.5094480Z Nov 12 02:10:24 Eat > org.apache.calcite.rel.metadata.RelMetadataQueryBase.revise(RelMetadataQueryBase.java:118) > 2023-11-12T02:10:24.5095365Z Nov 12 02:10:24 Eat > org.apache.calcite.rel.metadata.RelMetadataQuery.getPulledUpPredicates(RelMetadataQuery.java:844) > 2023-11-12T02:10:24.5096306Z Nov 12 02:10:24 Eat > org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:307) > 2023-11-12T02:10:24.5097238Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:337) > 2023-11-12T02:10:24.5098014Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556) > 2023-11-12T02:10:24.5098753Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:420) > 2023-11-12T02:10:24.5099517Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepPlanner.executeRuleInstance(HepPlanner.java:243) > 2023-11-12T02:10:24.5100373Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepInstruction$RuleInstance$State.execute(HepInstruction.java:178) > 2023-11-12T02:10:24.5101313Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepPlanner.lambda$executeProgram$0(HepPlanner.java:211) > 2023-11-12T02:10:24.5102410Z Nov 12 02:10:24 Eat > org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableList.forEach(ImmutableList.java:422) > 2023-11-12T02:10:24.5103343Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:210) > 2023-11-12T02:10:24.5104105Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepProgram$State.execute(HepProgram.java:118) > 2023-11-12T02:10:24.5104868Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:205) > 2023-11-12T02:10:24.5105616Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:191) > 2023-11-12T02:10:24.5106421Z Nov 12 02:10:24 Eat >
[jira] [Commented] (FLINK-33531) Nightly Python fails with NPE at metadataHandlerProvider on AZP (StreamDependencyTests.test_add_python_archive)
[ https://issues.apache.org/jira/browse/FLINK-33531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798275#comment-17798275 ] Jing Ge commented on FLINK-33531: - let's add it into 1.18.1 release > Nightly Python fails with NPE at metadataHandlerProvider on AZP > (StreamDependencyTests.test_add_python_archive) > --- > > Key: FLINK-33531 > URL: https://issues.apache.org/jira/browse/FLINK-33531 > Project: Flink > Issue Type: Technical Debt > Components: API / Python >Affects Versions: 1.19.0 >Reporter: Sergey Nuyanzin >Assignee: Xingbo Huang >Priority: Blocker > Labels: pull-request-available, test-stability > Fix For: 1.19.0 > > > It seems starting 02.11.2023 every master nightly fails with this (that's why > it is a blocker) > for instance > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=54512=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901] > {noformat} > 2023-11-12T02:10:24.5082784Z Nov 12 02:10:24 if is_error(answer)[0]: > 2023-11-12T02:10:24.5083620Z Nov 12 02:10:24 if len(answer) > 1: > 2023-11-12T02:10:24.5084326Z Nov 12 02:10:24 type = answer[1] > 2023-11-12T02:10:24.5085164Z Nov 12 02:10:24 value = > OUTPUT_CONVERTER[type](answer[2:], gateway_client) > 2023-11-12T02:10:24.5086061Z Nov 12 02:10:24 if answer[1] == > REFERENCE_TYPE: > 2023-11-12T02:10:24.5086850Z Nov 12 02:10:24 > raise > Py4JJavaError( > 2023-11-12T02:10:24.5087677Z Nov 12 02:10:24 "An > error occurred while calling {0}{1}{2}.\n". > 2023-11-12T02:10:24.5088538Z Nov 12 02:10:24 > format(target_id, ".", name), value) > 2023-11-12T02:10:24.5089551Z Nov 12 02:10:24 E > py4j.protocol.Py4JJavaError: An error occurred while calling > o3371.executeInsert. > 2023-11-12T02:10:24.5090832Z Nov 12 02:10:24 E : > java.lang.NullPointerException: metadataHandlerProvider > 2023-11-12T02:10:24.5091832Z Nov 12 02:10:24 Eat > java.util.Objects.requireNonNull(Objects.java:228) > 2023-11-12T02:10:24.5093399Z Nov 12 02:10:24 Eat > org.apache.calcite.rel.metadata.RelMetadataQueryBase.getMetadataHandlerProvider(RelMetadataQueryBase.java:122) > 2023-11-12T02:10:24.5094480Z Nov 12 02:10:24 Eat > org.apache.calcite.rel.metadata.RelMetadataQueryBase.revise(RelMetadataQueryBase.java:118) > 2023-11-12T02:10:24.5095365Z Nov 12 02:10:24 Eat > org.apache.calcite.rel.metadata.RelMetadataQuery.getPulledUpPredicates(RelMetadataQuery.java:844) > 2023-11-12T02:10:24.5096306Z Nov 12 02:10:24 Eat > org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:307) > 2023-11-12T02:10:24.5097238Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:337) > 2023-11-12T02:10:24.5098014Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556) > 2023-11-12T02:10:24.5098753Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:420) > 2023-11-12T02:10:24.5099517Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepPlanner.executeRuleInstance(HepPlanner.java:243) > 2023-11-12T02:10:24.5100373Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepInstruction$RuleInstance$State.execute(HepInstruction.java:178) > 2023-11-12T02:10:24.5101313Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepPlanner.lambda$executeProgram$0(HepPlanner.java:211) > 2023-11-12T02:10:24.5102410Z Nov 12 02:10:24 Eat > org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableList.forEach(ImmutableList.java:422) > 2023-11-12T02:10:24.5103343Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:210) > 2023-11-12T02:10:24.5104105Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepProgram$State.execute(HepProgram.java:118) > 2023-11-12T02:10:24.5104868Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:205) > 2023-11-12T02:10:24.5105616Z Nov 12 02:10:24 Eat > org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:191) > 2023-11-12T02:10:24.5106421Z Nov 12 02:10:24 Eat >
[PR] [FLINK-33560][Connectors/AWS] Externalize AWS Python connectors from Flink to AWS project [flink-connector-aws]
dannycranmer opened a new pull request, #121: URL: https://github.com/apache/flink-connector-aws/pull/121 ## Purpose of the change Inspired by https://github.com/apache/flink-connector-kafka/pull/69, moves `kinesis.py` and `test_kinesis.py` from `apache/flink` to `apache/flink-connector-aws`. When running `mvn clean install` downloads the testing infra scripts from the main Flink repo (flink-python/dev/build-wheels.sh, flink-python/dev/install_command.sh, flink-python/dev/lint-python.sh) ## Verifying this change Tests can be run by the following commands: ``` cd flink-python-connector-aws chmod a+x dev/* ./dev/lint-python.sh -e mypy,sphinx ``` Additionally I have build the archive via `./dev/build-wheels.sh` and then included in a local sample app that consumes from Kinesis and prints to terminal. ## Significant changes *(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)* - [ ] Dependencies have been added or upgraded - [ ] Public API has been changed (Public API is any class annotated with `@Public(Evolving)`) - [ ] Serializers have been changed - [ ] New feature has been introduced - If yes, how is this documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33560) Externalize Kinesis Python connector code
[ https://issues.apache.org/jira/browse/FLINK-33560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33560: --- Labels: pull-request-available (was: ) > Externalize Kinesis Python connector code > - > > Key: FLINK-33560 > URL: https://issues.apache.org/jira/browse/FLINK-33560 > Project: Flink > Issue Type: Sub-task > Components: API / Python, Connectors / Kinesis >Affects Versions: 1.18.0, aws-connector-4.2.0 >Reporter: Márton Balassi >Assignee: Danny Cranmer >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0, aws-connector-4.3.0 > > > See description of parent ticket for context. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-30535] Introduce TTL state based benchmarks [flink-benchmarks]
Zakelly commented on code in PR #83: URL: https://github.com/apache/flink-benchmarks/pull/83#discussion_r1430371694 ## src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java: ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.benchmark.ttl; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend; +import org.apache.flink.state.benchmark.StateBenchmarkBase; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.VerboseMode; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.applyToAllKeys; +import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.compactState; +import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getListState; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.listValueCount; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount; + +/** Implementation for list state benchmark testing. */ +public class TtlListStateBenchmark extends TtlStateBenchmarkBase { +private final String STATE_NAME = "listState"; +private final ListStateDescriptor STATE_DESC = +configTtl(new ListStateDescriptor<>(STATE_NAME, Long.class)); +private ListState listState; +private List dummyLists; + +public static void main(String[] args) throws RunnerException { +Options opt = +new OptionsBuilder() +.verbosity(VerboseMode.NORMAL) +.include(".*" + TtlListStateBenchmark.class.getCanonicalName() + ".*") +.build(); + +new Runner(opt).run(); +} + +@Setup +public void setUp() throws Exception { +keyedStateBackend = createKeyedStateBackend(); +listState = getListState(keyedStateBackend, STATE_DESC); +dummyLists = new ArrayList<>(listValueCount); +for (int i = 0; i < listValueCount; ++i) { +dummyLists.add(random.nextLong()); +} +keyIndex = new AtomicInteger(); +} + +@Setup(Level.Iteration) +public void setUpPerIteration() throws Exception { +for (int i = 0; i < setupKeyCount; ++i) { +keyedStateBackend.setCurrentKey((long) i); +listState.add(random.nextLong()); +} +// make sure only one sst file left, so all get invocation will access this single file, +// to prevent the spike caused by different key distribution in multiple sst files, +// the more access to the older sst file, the lower throughput will be. +if (keyedStateBackend instanceof RocksDBKeyedStateBackend) { Review Comment: After some try, I found there is no easy way to do so, since the `Ttl*StateBenchmark` extends the `TtlStateBenchmarkBase`, while they cannot extends `*StateBenchmark` in the meantime. So a possible way is to extract an `TestHarness` or `TestContext` to hold the variables. I only do refactor in `valueAdd`, and test it on my local machine. Before: ``` Benchmark (backendType) Mode Cnt Score Error Units ValueStateBenchmark.valueAdd HEAP thrpt 30 5815.682 ± 122.008 ops/ms Benchmark (backendType) Mode Cnt Score Error Units ValueStateBenchmark.valueAdd HEAP thrpt 30 5817.108 ± 162.588 ops/ms Benchmark (backendType) Mode Cnt Score Error Units ValueStateBenchmark.valueAdd HEAP thrpt 30 5792.671 ±
[jira] [Commented] (FLINK-32850) [JUnit5 Migration] The io package of flink-runtime module
[ https://issues.apache.org/jira/browse/FLINK-32850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798267#comment-17798267 ] Jiabao Sun commented on FLINK-32850: Merged master (1.19) via: 1c67cccd2fdd6c674a38e0c26fe990e1dd7b62ae > [JUnit5 Migration] The io package of flink-runtime module > - > > Key: FLINK-32850 > URL: https://issues.apache.org/jira/browse/FLINK-32850 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Rui Fan >Assignee: Jiabao Sun >Priority: Minor > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33823] Make PlannerQueryOperation SQL serializable [flink]
twalthr commented on code in PR #23948: URL: https://github.com/apache/flink/pull/23948#discussion_r1430361998 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerQueryOperation.java: ## @@ -72,6 +75,13 @@ public String asSummaryString() { "PlannerNode", Collections.emptyMap(), getChildren(), Operation::asSummaryString); } +@Override +public String asSerializableString() { +final RelToSqlConverter relToSqlConverter = new RelToSqlConverter(AnsiSqlDialect.DEFAULT); Review Comment: We should not expose Calcite exceptions in this case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33876][table-planner][JUnit5 Migration] Introduce methodName method in TableTestBase [flink]
flinkbot commented on PR #23952: URL: https://github.com/apache/flink/pull/23952#issuecomment-1860933072 ## CI report: * 85c84e98f1c1c483972dbe30b4e30ca127817360 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33823] Make PlannerQueryOperation SQL serializable [flink]
twalthr commented on code in PR #23948: URL: https://github.com/apache/flink/pull/23948#discussion_r1430361475 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerQueryOperation.java: ## @@ -72,6 +75,13 @@ public String asSummaryString() { "PlannerNode", Collections.emptyMap(), getChildren(), Operation::asSummaryString); } +@Override +public String asSerializableString() { +final RelToSqlConverter relToSqlConverter = new RelToSqlConverter(AnsiSqlDialect.DEFAULT); Review Comment: In any case we need try-catch around it. Or will e.g. Table API queries that take the traditional route properly serialize? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector
[ https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798178#comment-17798178 ] david radley edited comment on FLINK-33365 at 12/18/23 4:12 PM: [~Sergey Nuyanzin] from the discussion above. I am thinking the s issue was not really an issue, this was my misunderstanding. The options I see are: 1) Do as you say, but wouldn't this be regression for the jdbc driver without filters, as there is likely to be a performance impact without filter pushdown 2) We could complete the fix I have. I have it working locally with the OR case. I am looking at implementing more test cases.I am struggling to make the unit test cases, to be defined in data with a for loop to execute them. If you are ok with the style of unit tests I have, I could have this up today or tomorrow. 3) If we do not view this as a blocker - we could release without reverting the commit and continue investigating. My preference would be option 2 - we are looking to have this fix in and would probably need to patch out code to pick it up. But if this takes too long to get merged, option 3 would be an option. Strangely the config option to disable predicate pushdowns works for me. I was not expecting it to work as it should only effect the legacy style of predicate pushdown. WDYT? was (Author: JIRAUSER300523): [~Sergey Nuyanzin] from the discussion above. I am thinking the s issue was not really an issue, this was my misunderstanding. The options I see are: 1) Do as you say, but this wouldn't this be regression for the jdbc driver without filters, as there is likely to be a performance impact without filter pushdown 2) We could complete the fix I have. I have it working locally with the OR case. I am looking at implementing more test cases.I am struggling to make the unit test cases, to be defined in data with a for loop to execute them. If you are ok with the style of unit tests I have, I could have this up today or tomorrow. 3) If we do not view this as a blocker - we could release without reverting the commit and continue investigating. My preference would be option 2 - we are looking to have this fix in and would probably need to patch out code to pick it up. But if this takes too long to get merged, option 3 would be an option. Strangely the config option to disable predicate pushdowns works for me. I was not expecting it to work as it should only effect the legacy style of predicate pushdown. WDYT? > Missing filter condition in execution plan containing lookup join with mysql > jdbc connector > --- > > Key: FLINK-33365 > URL: https://issues.apache.org/jira/browse/FLINK-33365 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC >Affects Versions: 1.18.0, 1.17.1 > Environment: Flink 1.17.1 & Flink 1.18.0 with > flink-connector-jdbc-3.1.1-1.17.jar >Reporter: macdoor615 >Assignee: david radley >Priority: Critical > Labels: pull-request-available > Attachments: flink-connector-jdbc-3.0.0-1.16.png, > flink-connector-jdbc-3.1.1-1.17.png > > > create table in flink with sql-client.sh > {code:java} > CREATE TABLE default_catalog.default_database.a ( > ip string, > proctime as proctime() > ) > WITH ( > 'connector' = 'datagen' > );{code} > create table in mysql > {code:java} > create table b ( > ip varchar(20), > type int > ); {code} > > Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar* > excute in sql-client.sh > {code:java} > explain SELECT * FROM default_catalog.default_database.a left join > bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and > a.ip = b.ip; {code} > get the execution plan > {code:java} > ... > == Optimized Execution Plan == > Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type]) > +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], > lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip > AS VARCHAR(2147483647)) AS ip0]) > +- Calc(select=[ip, PROCTIME() AS proctime]) > +- TableSourceScan(table=[[default_catalog, default_database, a]], > fields=[ip]){code} > > excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and > *flink-connector-jdbc-3.0.0-1.16.jar* > get the execution plan > {code:java} > ... > == Optimized Execution Plan == > Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type]) > +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], > lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 > AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0]) > +-
[jira] [Comment Edited] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector
[ https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798178#comment-17798178 ] david radley edited comment on FLINK-33365 at 12/18/23 4:10 PM: [~Sergey Nuyanzin] from the discussion above. I am thinking the s issue was not really an issue, this was my misunderstanding. The options I see are: 1) Do as you say, but this wouldn't this be regression for the jdbc driver without filters, as there is likely to be a performance impact without filter pushdown 2) We could complete the fix I have. I have it working locally with the OR case. I am looking at implementing more test cases.I am struggling to make the unit test cases, to be defined in data with a for loop to execute them. If you are ok with the style of unit tests I have, I could have this up today or tomorrow. 3) If we do not view this as a blocker - we could release without reverting the commit and continue investigating. My preference would be option 2 - we are looking to have this fix in and would probably need to patch out code to pick it up. But if this takes too long to get merged, option 3 would be an option. Strangely the config option to disable predicate pushdowns works for me. I was not expecting it to work as it should only effect the legacy style of predicate pushdown. WDYT? was (Author: JIRAUSER300523): [~Sergey Nuyanzin] from the discussion above. I am thinking the s issue was not really an issue, this was my misunderstanding. The options I see are: 1) Do as you say, but this wouldn't this be regression for the jdbc driver without filters, as there is likely to be a performance impact without filter pushdown 2) We could complete the fix I have. I have it working locally with the OR case. I am looking at the implementing more test cases.I am struggling to make the unit test cases, to be defined in data with a for loop to execute them. If you are ok with the style of unit tests I have, I could have this up today or tomorrow. 3) If we do not view this as a blocker - we could release without reverting the commit and continue investigating. My preference would be option 2 - we are looking to have this fix in and would probably need to patch out code to pick it up. But if this takes too long to get merged, option 3 would be an option. Strangely the config option to disable predicate pushdowns works for me. I was not expecting it to work as it should only effect the legacy style of predicate pushdown. WDYT? > Missing filter condition in execution plan containing lookup join with mysql > jdbc connector > --- > > Key: FLINK-33365 > URL: https://issues.apache.org/jira/browse/FLINK-33365 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC >Affects Versions: 1.18.0, 1.17.1 > Environment: Flink 1.17.1 & Flink 1.18.0 with > flink-connector-jdbc-3.1.1-1.17.jar >Reporter: macdoor615 >Assignee: david radley >Priority: Critical > Labels: pull-request-available > Attachments: flink-connector-jdbc-3.0.0-1.16.png, > flink-connector-jdbc-3.1.1-1.17.png > > > create table in flink with sql-client.sh > {code:java} > CREATE TABLE default_catalog.default_database.a ( > ip string, > proctime as proctime() > ) > WITH ( > 'connector' = 'datagen' > );{code} > create table in mysql > {code:java} > create table b ( > ip varchar(20), > type int > ); {code} > > Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar* > excute in sql-client.sh > {code:java} > explain SELECT * FROM default_catalog.default_database.a left join > bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and > a.ip = b.ip; {code} > get the execution plan > {code:java} > ... > == Optimized Execution Plan == > Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type]) > +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], > lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip > AS VARCHAR(2147483647)) AS ip0]) > +- Calc(select=[ip, PROCTIME() AS proctime]) > +- TableSourceScan(table=[[default_catalog, default_database, a]], > fields=[ip]){code} > > excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and > *flink-connector-jdbc-3.0.0-1.16.jar* > get the execution plan > {code:java} > ... > == Optimized Execution Plan == > Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type]) > +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], > lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 > AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0]) > +-
[jira] [Updated] (FLINK-33876) [JUnit5 Migration] Introduce methodName method in TableTestBase
[ https://issues.apache.org/jira/browse/FLINK-33876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33876: --- Labels: pull-request-available (was: ) > [JUnit5 Migration] Introduce methodName method in TableTestBase > --- > > Key: FLINK-33876 > URL: https://issues.apache.org/jira/browse/FLINK-33876 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner, Tests >Affects Versions: 1.19.0 >Reporter: Jiabao Sun >Priority: Major > Labels: pull-request-available > > After completing the JUnit5 migration in the table planner, there is an > incompatibility issue with JUnit TestName and TestInfo. Therefore, > considering introducing the methodName method in TableTestBase. External > connectors's TablePlanTest can override this method when performing JUnit 5 > migration for TableTestBase to avoid compilation issues. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33876][table-planner][JUnit5 Migration] Introduce methodName method in TableTestBase [flink]
Jiabao-Sun commented on PR #23952: URL: https://github.com/apache/flink/pull/23952#issuecomment-1860909405 Hi @leonardBang, please help review it when you have time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-33876][table-planner][JUnit5 Migration] Introduce methodName method in TableTestBase [flink]
Jiabao-Sun opened a new pull request, #23952: URL: https://github.com/apache/flink/pull/23952 ## What is the purpose of the change [FLINK-33876][table-planner][JUnit5 Migration] Introduce methodName method in TableTestBase ## Brief change log After completing the JUnit5 migration in the table planner, there is an incompatibility issue with JUnit TestName and TestInfo. Therefore, considering introducing the methodName method in TableTestBase. External connectors's TablePlanTest can override this method when performing JUnit 5 migration for TableTestBase to avoid compilation issues. ## Verifying this change This change is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33876) [JUnit5 Migration] Introduce methodName method in TableTestBase
[ https://issues.apache.org/jira/browse/FLINK-33876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiabao Sun updated FLINK-33876: --- Summary: [JUnit5 Migration] Introduce methodName method in TableTestBase (was: [JUnit5 Migration] Introduce testName method in TableTestBase) > [JUnit5 Migration] Introduce methodName method in TableTestBase > --- > > Key: FLINK-33876 > URL: https://issues.apache.org/jira/browse/FLINK-33876 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner, Tests >Affects Versions: 1.19.0 >Reporter: Jiabao Sun >Priority: Major > > After completing the JUnit5 migration in the table planner, there is an > incompatibility issue with JUnit TestName and TestInfo. Therefore, > considering introducing the methodName method in TableTestBase. External > connectors's TablePlanTest can override this method when performing JUnit 5 > migration for TableTestBase to avoid compilation issues. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31472) AsyncSinkWriterThrottlingTest failed with Illegal mailbox thread
[ https://issues.apache.org/jira/browse/FLINK-31472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798256#comment-17798256 ] Ahmed Hamdy commented on FLINK-31472: - [~Sergey Nuyanzin] I have pushed a PR with the fix, would be great if you reviewed it. I am going to rebase the fix on 1.17, 1.18 once this is merged. > AsyncSinkWriterThrottlingTest failed with Illegal mailbox thread > > > Key: FLINK-31472 > URL: https://issues.apache.org/jira/browse/FLINK-31472 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.17.0, 1.16.1, 1.19.0 >Reporter: Ran Tao >Assignee: Ahmed Hamdy >Priority: Major > Labels: pull-request-available, test-stability > > when run mvn clean test, this case failed occasionally. > {noformat} > [ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.955 > s <<< FAILURE! - in > org.apache.flink.connector.base.sink.writer.AsyncSinkWriterThrottlingTest > [ERROR] > org.apache.flink.connector.base.sink.writer.AsyncSinkWriterThrottlingTest.testSinkThroughputShouldThrottleToHalfBatchSize > Time elapsed: 0.492 s <<< ERROR! > java.lang.IllegalStateException: Illegal thread detected. This method must be > called from inside the mailbox thread! > at > org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.checkIsMailboxThread(TaskMailboxImpl.java:262) > at > org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:137) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.yield(MailboxExecutorImpl.java:84) > at > org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.flush(AsyncSinkWriter.java:367) > at > org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.lambda$registerCallback$3(AsyncSinkWriter.java:315) > at > org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService$CallbackTask.onProcessingTime(TestProcessingTimeService.java:199) > at > org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService.setCurrentTime(TestProcessingTimeService.java:76) > at > org.apache.flink.connector.base.sink.writer.AsyncSinkWriterThrottlingTest.testSinkThroughputShouldThrottleToHalfBatchSize(AsyncSinkWriterThrottlingTest.java:64) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at org.junit.runner.JUnitCore.run(JUnitCore.java:115) > at > org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42) > at > org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80) > at > org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127) > at >
[jira] [Created] (FLINK-33876) [JUnit5 Migration] Introduce testName method in TableTestBase
Jiabao Sun created FLINK-33876: -- Summary: [JUnit5 Migration] Introduce testName method in TableTestBase Key: FLINK-33876 URL: https://issues.apache.org/jira/browse/FLINK-33876 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner, Tests Affects Versions: 1.19.0 Reporter: Jiabao Sun After completing the JUnit5 migration in the table planner, there is an incompatibility issue with JUnit TestName and TestInfo. Therefore, considering introducing the methodName method in TableTestBase. External connectors's TablePlanTest can override this method when performing JUnit 5 migration for TableTestBase to avoid compilation issues. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-32850][flink-runtime][JUnit5 Migration] The io.network.api package of flink-runtime module [flink]
dawidwys merged PR #23590: URL: https://github.com/apache/flink/pull/23590 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33862) Flink Unit Test Failures on 1.18.0
[ https://issues.apache.org/jira/browse/FLINK-33862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798251#comment-17798251 ] Prabhu Joseph commented on FLINK-33862: --- Yes right. Thanks [~martijnvisser]. > Flink Unit Test Failures on 1.18.0 > -- > > Key: FLINK-33862 > URL: https://issues.apache.org/jira/browse/FLINK-33862 > Project: Flink > Issue Type: Bug >Affects Versions: 1.18.0, 1.19.0 >Reporter: Prabhu Joseph >Priority: Major > > Flink Unit Test Failures on 1.18.0. There are 100+ unit test cases failing > due to below common issues. > *Issue 1* > {code:java} > ./mvnw -DfailIfNoTests=false -Dmaven.test.failure.ignore=true > -Dtest=ExecutionPlanAfterExecutionTest test > [INFO] Running org.apache.flink.client.program.ExecutionPlanAfterExecutionTest > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141) > at > java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) > at > org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler.lambda$invokeRpc$1(PekkoInvocationHandler.java:268) > at > java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > at > java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) > at > org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1267) > at > org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$1(ClassLoadingUtils.java:93) > at > org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) > at > org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92) > at > java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > at > java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) > at > org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$1.onComplete(ScalaFutureUtils.java:47) > at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:310) > at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:307) > at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:234) > at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:231) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) > at > org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$DirectExecutionContext.execute(ScalaFutureUtils.java:65) > at > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72) > at > scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288) > at > scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288) > at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288) > at org.apache.pekko.pattern.PromiseActorRef.$bang(AskSupport.scala:629) > at > org.apache.pekko.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:34) > at > org.apache.pekko.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:33) > at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536) > at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) > at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) > at > org.apache.pekko.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:73) > at > org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:110) > at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) >
Re: [PR] [FLINK-32850][flink-runtime][JUnit5 Migration] The io.network.api package of flink-runtime module [flink]
dawidwys commented on code in PR #23590: URL: https://github.com/apache/flink/pull/23590#discussion_r1430305765 ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java: ## @@ -22,37 +22,29 @@ import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.junit.Test; +import org.junit.jupiter.api.Test; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for the {@link CheckpointBarrier} type. */ -public class CheckpointBarrierTest { +class CheckpointBarrierTest { /** * Test serialization of the checkpoint barrier. The checkpoint barrier does not support its own * serialization, in order to be immutable. */ @Test -public void testSerialization() throws Exception { +void testSerialization() { long id = Integer.MAX_VALUE + 123123L; long timestamp = Integer.MAX_VALUE + 1228L; CheckpointOptions options = CheckpointOptions.forCheckpointWithDefaultLocation(); CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, options); -try { -barrier.write(new DataOutputSerializer(1024)); -fail("should throw an exception"); Review Comment: I don't think it adds any value. The `fail("should throw an exception")` is there simply to make sure, the exception is thrown -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33823] Make PlannerQueryOperation SQL serializable [flink]
twalthr commented on code in PR #23948: URL: https://github.com/apache/flink/pull/23948#discussion_r1430298273 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerQueryOperation.java: ## @@ -72,6 +75,13 @@ public String asSummaryString() { "PlannerNode", Collections.emptyMap(), getChildren(), Operation::asSummaryString); } +@Override +public String asSerializableString() { +final RelToSqlConverter relToSqlConverter = new RelToSqlConverter(AnsiSqlDialect.DEFAULT); Review Comment: Not sure if `AnsiSqlDialect.DEFAULT` is a good idea. This might have implications on serialization of quotes or some other tiny little details. Since `PlannerQueryOperation` comes right out of the planner we can also pass some context into it. Don't we have some RelToSqlConverter or Flink dialect somewhere? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33588) Fix Flink Checkpointing Statistics Bug
[ https://issues.apache.org/jira/browse/FLINK-33588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798240#comment-17798240 ] Jing Ge commented on FLINK-33588: - [~zhutong66] could you please write a test for your change? > Fix Flink Checkpointing Statistics Bug > -- > > Key: FLINK-33588 > URL: https://issues.apache.org/jira/browse/FLINK-33588 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.14.5, 1.16.0, 1.17.0, 1.15.2, 1.14.6, 1.18.0, 1.17.1 >Reporter: Tongtong Zhu >Assignee: Tongtong Zhu >Priority: Critical > Labels: pull-request-available > Fix For: 1.19.0, 1.18.1 > > Attachments: FLINK-33588.patch, image-2023-12-11-17-35-23-391.png, > image-2023-12-13-11-35-43-780.png, image-2023-12-15-13-59-28-201.png, > newCommit-FLINK-33688.patch > > > When the Flink task is first started, the checkpoint data is null due to the > lack of data, and Percentile throws a null pointer exception when calculating > the percentage. After multiple tests, I found that it is necessary to set an > initial value for the statistical data value of the checkpoint when the > checkpoint data is null (i.e. at the beginning of the task) to solve this > problem. > The following is an abnormal description of the bug: > 2023-09-13 15:02:54,608 ERROR > org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler > [] - Unhandled exception. > org.apache.commons.math3.exception.NullArgumentException: input array > at > org.apache.commons.math3.util.MathArrays.verifyValues(MathArrays.java:1650) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.commons.math3.stat.descriptive.AbstractUnivariateStatistic.test(AbstractUnivariateStatistic.java:158) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.commons.math3.stat.descriptive.rank.Percentile.evaluate(Percentile.java:272) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.commons.math3.stat.descriptive.rank.Percentile.evaluate(Percentile.java:241) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogramStatistics$CommonMetricsSnapshot.getPercentile(DescriptiveStatisticsHistogramStatistics.java:159) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogramStatistics.getQuantile(DescriptiveStatisticsHistogramStatistics.java:53) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.checkpoint.StatsSummarySnapshot.getQuantile(StatsSummarySnapshot.java:108) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.messages.checkpoints.StatsSummaryDto.valueOf(StatsSummaryDto.java:81) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.createCheckpointingStatistics(CheckpointingStatisticsHandler.java:129) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.handleRequest(CheckpointingStatisticsHandler.java:84) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.handleRequest(CheckpointingStatisticsHandler.java:58) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.AbstractAccessExecutionGraphHandler.handleRequest(AbstractAccessExecutionGraphHandler.java:68) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$0(AbstractExecutionGraphHandler.java:87) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) > [?:1.8.0_151] > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > [?:1.8.0_151] > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > [?:1.8.0_151] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [?:1.8.0_151] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_151] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > [?:1.8.0_151] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > [?:1.8.0_151] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_151] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_151] > at
[jira] [Comment Edited] (FLINK-32964) KinesisStreamsSink cant renew credentials with WebIdentityTokenFileCredentialsProvider
[ https://issues.apache.org/jira/browse/FLINK-32964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798229#comment-17798229 ] Jan Kamieth edited comment on FLINK-32964 at 12/18/23 2:42 PM: --- Hi, we have the same issue in the same setup. Flink 1.17 with Kinesis connector org.apache.flink:flink-connector-kinesis:4.1.0-1.17 running in EKS with access provided by an IAM role in a k8s service account. After running the app, it is caught in a crash loop with the following stack trace after a couple of hours. This happened multiple times. After restarting the job manager, the app continued from last checkpoint until the next crash loop a couple of hours later. {code:java} java.lang.IllegalStateException: Connection pool shut down at org.apache.flink.kinesis.shaded.org.apache.http.util.Asserts.check(Asserts.java:34) at org.apache.flink.kinesis.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:269) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$DelegatingHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:75) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$InstrumentedHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:57) at org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:176) at org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) at org.apache.flink.kinesis.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) at org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) at org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute(ApacheSdkHttpClient.java:72) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.execute(ApacheHttpClient.java:254) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.access$500(ApacheHttpClient.java:104) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:231) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:228) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:63) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java:77) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:56) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:39) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40) at
[jira] [Comment Edited] (FLINK-32964) KinesisStreamsSink cant renew credentials with WebIdentityTokenFileCredentialsProvider
[ https://issues.apache.org/jira/browse/FLINK-32964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798229#comment-17798229 ] Jan Kamieth edited comment on FLINK-32964 at 12/18/23 2:41 PM: --- Hi, we have the same issue in the same setup. Flink 1.17 with Kinesis connector org.apache.flink:flink-connector-kinesis:4.1.0-1.17 running in EKS with access provided by an IAM role in a k8s service account. After running the app, it is caught in a crash loop with the following stack trace after a couple of hours. This happened multiple times. After restarting the job manager, the app continued from last checkpoint until the next crash loop a couple of hours later. {code:java} java.lang.IllegalStateException: Connection pool shut down at org.apache.flink.kinesis.shaded.org.apache.http.util.Asserts.check(Asserts.java:34) at org.apache.flink.kinesis.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:269) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$DelegatingHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:75) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$InstrumentedHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:57) at org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:176) at org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) at org.apache.flink.kinesis.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) at org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) at org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute(ApacheSdkHttpClient.java:72) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.execute(ApacheHttpClient.java:254) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.access$500(ApacheHttpClient.java:104) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:231) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:228) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:63) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java:77) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:56) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:39) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40) at
[jira] [Comment Edited] (FLINK-32964) KinesisStreamsSink cant renew credentials with WebIdentityTokenFileCredentialsProvider
[ https://issues.apache.org/jira/browse/FLINK-32964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798229#comment-17798229 ] Jan Kamieth edited comment on FLINK-32964 at 12/18/23 2:41 PM: --- Hi, we have the same issue in the same setup. Flink 1.17 with Kinesis connector org.apache.flink:flink-connector-kinesis:4.1.0-1.17 running in EKS with access provided by an IAM role in a k8s service account. After running the app, it is caught in a crash loop with the following stack trace after a couple of hours. This happened multiple times. {code:java} java.lang.IllegalStateException: Connection pool shut down at org.apache.flink.kinesis.shaded.org.apache.http.util.Asserts.check(Asserts.java:34) at org.apache.flink.kinesis.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:269) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$DelegatingHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:75) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$InstrumentedHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:57) at org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:176) at org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) at org.apache.flink.kinesis.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) at org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) at org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute(ApacheSdkHttpClient.java:72) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.execute(ApacheHttpClient.java:254) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.access$500(ApacheHttpClient.java:104) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:231) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:228) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:63) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java:77) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:56) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:39) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:50) at
[jira] [Comment Edited] (FLINK-32964) KinesisStreamsSink cant renew credentials with WebIdentityTokenFileCredentialsProvider
[ https://issues.apache.org/jira/browse/FLINK-32964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798229#comment-17798229 ] Jan Kamieth edited comment on FLINK-32964 at 12/18/23 2:40 PM: --- Hi, we have the same issue in the same setup. Flink 1.17 with Kinesis connector org.apache.flink:flink-connector-kinesis:4.1.0-1.17 running in EKS with access provided by an IAM role in a k8s service account. After running the app, it is caught in a crash loop with the following stack trace after a couple of hours: {code:java} java.lang.IllegalStateException: Connection pool shut down at org.apache.flink.kinesis.shaded.org.apache.http.util.Asserts.check(Asserts.java:34) at org.apache.flink.kinesis.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:269) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$DelegatingHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:75) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$InstrumentedHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:57) at org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:176) at org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) at org.apache.flink.kinesis.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) at org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) at org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute(ApacheSdkHttpClient.java:72) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.execute(ApacheHttpClient.java:254) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.access$500(ApacheHttpClient.java:104) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:231) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:228) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:63) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java:77) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:56) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:39) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:50) at
[jira] [Comment Edited] (FLINK-32964) KinesisStreamsSink cant renew credentials with WebIdentityTokenFileCredentialsProvider
[ https://issues.apache.org/jira/browse/FLINK-32964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798229#comment-17798229 ] Jan Kamieth edited comment on FLINK-32964 at 12/18/23 2:40 PM: --- Hi, we have the same issue in the same setup. Flink 1.17 with Kinesis connector {{org.apache.flink:flink-connector-kinesis:4.1.0-1.17 }}running in EKS with access provided by an IAM role in a k8s service account. After running the app, it is caught in a crash loop with the following stack trace after a couple of hours: {code:java} java.lang.IllegalStateException: Connection pool shut down at org.apache.flink.kinesis.shaded.org.apache.http.util.Asserts.check(Asserts.java:34) at org.apache.flink.kinesis.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:269) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$DelegatingHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:75) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$InstrumentedHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:57) at org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:176) at org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) at org.apache.flink.kinesis.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) at org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) at org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute(ApacheSdkHttpClient.java:72) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.execute(ApacheHttpClient.java:254) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.access$500(ApacheHttpClient.java:104) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:231) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:228) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:63) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java:77) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:56) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:39) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:50) at
[jira] [Created] (FLINK-33875) Support slots wait mechanism at DeclarativeSlotPoolBridge side for Default Scheduler
RocMarshal created FLINK-33875: -- Summary: Support slots wait mechanism at DeclarativeSlotPoolBridge side for Default Scheduler Key: FLINK-33875 URL: https://issues.apache.org/jira/browse/FLINK-33875 Project: Flink Issue Type: Sub-task Components: Runtime / Task Reporter: RocMarshal -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33874) Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler
[ https://issues.apache.org/jira/browse/FLINK-33874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] RocMarshal updated FLINK-33874: --- Summary: Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler (was: Introduce resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler) > Support resource request wait mechanism at DefaultDeclarativeSlotPool side > for Default Scheduler > > > Key: FLINK-33874 > URL: https://issues.apache.org/jira/browse/FLINK-33874 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task >Reporter: RocMarshal >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-32964) KinesisStreamsSink cant renew credentials with WebIdentityTokenFileCredentialsProvider
[ https://issues.apache.org/jira/browse/FLINK-32964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798229#comment-17798229 ] Jan Kamieth edited comment on FLINK-32964 at 12/18/23 2:36 PM: --- Hi, we have the same issue in the same setup. Flink 1.17 with Kinesis connector {{org.apache.flink:flink-connector-kinesis:4.1.0-1.17 }}running in EKS with access provided by an IAM role in a k8s service account. After running the app, it is caught in a crash loop with the following stack trace after a couple of hours: {noformat} java.lang.IllegalStateException: Connection pool shut down at org.apache.flink.kinesis.shaded.org.apache.http.util.Asserts.check(Asserts.java:34) at org.apache.flink.kinesis.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:269) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$DelegatingHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:75) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$InstrumentedHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:57) at org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:176) at org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) at org.apache.flink.kinesis.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) at org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) at org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute(ApacheSdkHttpClient.java:72) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.execute(ApacheHttpClient.java:254) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.access$500(ApacheHttpClient.java:104) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:231) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:228) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:63) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java:77) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:56) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:39) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:50) at
[jira] [Updated] (FLINK-33874) Introduce resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler
[ https://issues.apache.org/jira/browse/FLINK-33874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] RocMarshal updated FLINK-33874: --- Parent: FLINK-31757 Issue Type: Sub-task (was: New Feature) > Introduce resource request wait mechanism at DefaultDeclarativeSlotPool side > for Default Scheduler > -- > > Key: FLINK-33874 > URL: https://issues.apache.org/jira/browse/FLINK-33874 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task >Reporter: RocMarshal >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33874) Introduce resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler
RocMarshal created FLINK-33874: -- Summary: Introduce resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler Key: FLINK-33874 URL: https://issues.apache.org/jira/browse/FLINK-33874 Project: Flink Issue Type: New Feature Components: Runtime / Task Reporter: RocMarshal -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32964) KinesisStreamsSink cant renew credentials with WebIdentityTokenFileCredentialsProvider
[ https://issues.apache.org/jira/browse/FLINK-32964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798229#comment-17798229 ] Jan Kamieth commented on FLINK-32964: - Hi, we have the same issue in the same setup. Flink 1.17 with Kinesis connector `org.apache.flink:flink-connector-kinesis:4.1.0-1.17` running in EKS with access provided by an IAM role in a k8s service account. After running the app, it is caught in a crash loop with the following stack trace after a couple of hours: ``` java.lang.IllegalStateException: Connection pool shut down at org.apache.flink.kinesis.shaded.org.apache.http.util.Asserts.check(Asserts.java:34) at org.apache.flink.kinesis.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:269) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$DelegatingHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:75) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$InstrumentedHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:57) at org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:176) at org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) at org.apache.flink.kinesis.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) at org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) at org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute(ApacheSdkHttpClient.java:72) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.execute(ApacheHttpClient.java:254) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.access$500(ApacheHttpClient.java:104) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:231) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:228) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:63) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java:77) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:56) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:39) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:50) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:36) at
[jira] [Commented] (FLINK-33873) Create a Redis HyperLogLog Connector for Flink
[ https://issues.apache.org/jira/browse/FLINK-33873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798220#comment-17798220 ] Martijn Visser commented on FLINK-33873: [~jinsuichen] I've granted you the necessary permissions > Create a Redis HyperLogLog Connector for Flink > -- > > Key: FLINK-33873 > URL: https://issues.apache.org/jira/browse/FLINK-33873 > Project: Flink > Issue Type: New Feature > Components: Connectors / Redis Streams >Reporter: Jinsui Chen >Priority: Minor > Labels: features > Original Estimate: 168h > Remaining Estimate: 168h > > Redis HyperLogLog is a probabilistic data structure used for estimating the > cardinality of a dataset, which is the number of unique elements in a set. I > think it is possible to create a sink connector for HyperLogLog. > FLINK-15571 is about Redis stream connector. > Since there is no component for the Redis connector as a whole, the issue is > created under this component. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32949) Allow specifying the ServerSocket port for the collect function when accessing the TaskManager from the client.
[ https://issues.apache.org/jira/browse/FLINK-32949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Ge updated FLINK-32949: Fix Version/s: 1.19.0 > Allow specifying the ServerSocket port for the collect function when > accessing the TaskManager from the client. > --- > > Key: FLINK-32949 > URL: https://issues.apache.org/jira/browse/FLINK-32949 > Project: Flink > Issue Type: Improvement > Components: API / Core, API / DataStream, Runtime / Configuration >Reporter: JiaJian He >Assignee: Jing Ge >Priority: Minor > Labels: pull-request-available > Fix For: 1.19.0 > > > In the context of [#12069|https://github.com/apache/flink/pull/12069], the > initialization of the {{CollectSinkFunction$ServerThread}} currently uses > port 0, which corresponds to a random port assignment. > Issues might arise under the following circumstances: > # When the JobManager and TaskManager are deployed on different servers. > # When network communication between servers requires specific ports to be > open. > # When using {{sql-client.sh}} at the JobManager to execute operations like > selecting data, the CollectSinkFunction$ServerThread running on the > TaskManager using a random port can lead to data retrieval failures. > The purpose of this pull request is to address this problem by introducing a > configuration parameter, 'taskmanager.collect.port', which allows specifying > the port for the {{{}CollectSinkFunction$ServerThread{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-32949) Allow specifying the ServerSocket port for the collect function when accessing the TaskManager from the client.
[ https://issues.apache.org/jira/browse/FLINK-32949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Ge resolved FLINK-32949. - Resolution: Fixed > Allow specifying the ServerSocket port for the collect function when > accessing the TaskManager from the client. > --- > > Key: FLINK-32949 > URL: https://issues.apache.org/jira/browse/FLINK-32949 > Project: Flink > Issue Type: Improvement > Components: API / Core, API / DataStream, Runtime / Configuration >Reporter: JiaJian He >Assignee: Jing Ge >Priority: Minor > Labels: pull-request-available > > In the context of [#12069|https://github.com/apache/flink/pull/12069], the > initialization of the {{CollectSinkFunction$ServerThread}} currently uses > port 0, which corresponds to a random port assignment. > Issues might arise under the following circumstances: > # When the JobManager and TaskManager are deployed on different servers. > # When network communication between servers requires specific ports to be > open. > # When using {{sql-client.sh}} at the JobManager to execute operations like > selecting data, the CollectSinkFunction$ServerThread running on the > TaskManager using a random port can lead to data retrieval failures. > The purpose of this pull request is to address this problem by introducing a > configuration parameter, 'taskmanager.collect.port', which allows specifying > the port for the {{{}CollectSinkFunction$ServerThread{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32949) Allow specifying the ServerSocket port for the collect function when accessing the TaskManager from the client.
[ https://issues.apache.org/jira/browse/FLINK-32949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798219#comment-17798219 ] Jing Ge commented on FLINK-32949: - master: e6169cee12d66ce22f7907b8dc947d4e96228b30 > Allow specifying the ServerSocket port for the collect function when > accessing the TaskManager from the client. > --- > > Key: FLINK-32949 > URL: https://issues.apache.org/jira/browse/FLINK-32949 > Project: Flink > Issue Type: Improvement > Components: API / Core, API / DataStream, Runtime / Configuration >Reporter: JiaJian He >Assignee: Jing Ge >Priority: Minor > Labels: pull-request-available > > In the context of [#12069|https://github.com/apache/flink/pull/12069], the > initialization of the {{CollectSinkFunction$ServerThread}} currently uses > port 0, which corresponds to a random port assignment. > Issues might arise under the following circumstances: > # When the JobManager and TaskManager are deployed on different servers. > # When network communication between servers requires specific ports to be > open. > # When using {{sql-client.sh}} at the JobManager to execute operations like > selecting data, the CollectSinkFunction$ServerThread running on the > TaskManager using a random port can lead to data retrieval failures. > The purpose of this pull request is to address this problem by introducing a > configuration parameter, 'taskmanager.collect.port', which allows specifying > the port for the {{{}CollectSinkFunction$ServerThread{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-32949][core]collect tm port binding with TaskManagerOptions [flink]
JingGe merged PR #23870: URL: https://github.com/apache/flink/pull/23870 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31481][table] Support enhanced show databases syntax [flink]
jnh5y commented on PR #23612: URL: https://github.com/apache/flink/pull/23612#issuecomment-1860558626 > Hi @jnh5y could you please check if your comments are addressed? Thanks! Hi @jeyhunkarimov, sorry to be slow to look again. Things are looking good. @dawidwys would you be willing to take a quick read through. I've left some comments where I think you could provide some insight about what needs to change for this work to be merged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31481][table] Support enhanced show databases syntax [flink]
jnh5y commented on code in PR #23612: URL: https://github.com/apache/flink/pull/23612#discussion_r1430168768 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlShowDatabasesConverter.java: ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.dql.SqlShowDatabases; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.ShowDatabasesOperation; + +/** A converter for {@link SqlShowDatabases}. */ +public class SqlShowDatabasesConverter implements SqlNodeConverter { + +@Override +public Operation convertSqlNode(SqlShowDatabases sqlShowDatabases, ConvertContext context) { +if (sqlShowDatabases.getPreposition() == null) { +return new ShowDatabasesOperation( +sqlShowDatabases.getLikeType(), +sqlShowDatabases.getLikeSqlPattern(), +sqlShowDatabases.isNotLike()); +} else { +CatalogManager catalogManager = context.getCatalogManager(); +String[] fullCatalogName = sqlShowDatabases.getCatalog(); +if (fullCatalogName.length > 1) { Review Comment: Cool. @dawidwys does that make sense? Or should we avoid having constructors throw exceptions? (I'm not sure what the best practice is here.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31481][table] Support enhanced show databases syntax [flink]
jnh5y commented on code in PR #23612: URL: https://github.com/apache/flink/pull/23612#discussion_r1430167319 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowDatabasesOperation.java: ## @@ -20,26 +20,108 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.functions.SqlLikeUtils; +import java.util.List; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; import static org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult; /** Operation to describe a SHOW DATABASES statement. */ @Internal public class ShowDatabasesOperation implements ShowOperation { +private final String preposition; +private final String catalogName; +private final LikeType likeType; +private final String likePattern; +private final boolean notLike; + +public ShowDatabasesOperation() { +// "SHOW DATABASES" command with all options being default +this.preposition = null; +this.catalogName = null; +this.likeType = null; +this.likePattern = null; +this.notLike = false; +} + +public ShowDatabasesOperation(String likeType, String likePattern, boolean notLike) { +this.preposition = null; +this.catalogName = null; +if (likeType != null) { +this.likeType = LikeType.of(likeType); +this.likePattern = requireNonNull(likePattern, "Like pattern must not be null"); +this.notLike = notLike; +} else { +this.likeType = null; +this.likePattern = null; +this.notLike = false; +} +} + +public ShowDatabasesOperation( +String preposition, +String catalogName, +String likeType, +String likePattern, +boolean notLike) { +this.preposition = preposition; +this.catalogName = catalogName; +if (likeType != null) { +this.likeType = LikeType.of(likeType); +this.likePattern = requireNonNull(likePattern, "Like pattern must not be null"); +this.notLike = notLike; +} else { +this.likeType = null; +this.likePattern = null; +this.notLike = false; +} +} + @Override public String asSummaryString() { -return "SHOW DATABASES"; +StringBuilder builder = new StringBuilder(); +builder.append("SHOW DATABASES"); +if (preposition != null) { +builder.append(String.format(" %s %s", preposition, catalogName)); +} +if (likeType != null) { +if (notLike) { +builder.append(String.format(" NOT %s '%s'", likeType.name(), likePattern)); +} else { +builder.append(String.format(" %s '%s'", likeType.name(), likePattern)); +} +} +return builder.toString(); } @Override public TableResultInternal execute(Context ctx) { -String[] databases = -ctx.getCatalogManager() - .getCatalogOrThrowException(ctx.getCatalogManager().getCurrentCatalog()) -.listDatabases().stream() -.sorted() -.toArray(String[]::new); -return buildStringArrayResult("database name", databases); +String cName = +catalogName == null ? ctx.getCatalogManager().getCurrentCatalog() : catalogName; +List databases = + ctx.getCatalogManager().getCatalogOrThrowException(cName).listDatabases(); + +if (likeType != null) { +databases = +databases.stream() +.filter( +row -> { +if (likeType == LikeType.ILIKE) { +return notLike +!= SqlLikeUtils.ilike(row, likePattern, "\\"); +} else if (likeType == LikeType.LIKE) { +return notLike +!= SqlLikeUtils.like(row, likePattern, "\\"); +} +return false; +}) +.collect(Collectors.toList()); +} + +return buildStringArrayResult( +"database name", databases.stream().sorted().toArray(String[]::new)); Review Comment: Thanks for improving this! I'll leave it up to @dawidwys if he has any other ideas here. I'm tempted to suggest something that doesn't collect the intermediate list. (On one hand, this code isn't in an inner