[jira] [Created] (FLINK-31223) sql-client.sh fails to start with ssl enabled
macdoor615 created FLINK-31223: -- Summary: sql-client.sh fails to start with ssl enabled Key: FLINK-31223 URL: https://issues.apache.org/jira/browse/FLINK-31223 Project: Flink Issue Type: Bug Components: Table SQL / Client Affects Versions: 1.17.0 Reporter: macdoor615 Fix For: 1.17.0 *Version:* 1.17-SNAPSHOT *Commit:* c66ef25 1. ssl disabled sql-client.sh works properly 2. ssl enabled web ui can access with [https://url|https://url/] The task can be submitted correctly through sql-gateway. I can confirm that sql-gateway exposes the http protocol, not https. But sql-client.sh fails to start with the following exceptions. It seems that sql-client.sh expect https protocol {code:java} 2023-02-25 14:43:19,317 INFO org.apache.flink.configuration.Configuration [] - Config uses fallback configuration key 'rest.port' instead of key 'rest.bind-port' 2023-02-25 14:43:19,343 INFO org.apache.flink.table.gateway.rest.SqlGatewayRestEndpoint [] - Starting rest endpoint. 2023-02-25 14:43:19,713 INFO org.apache.flink.table.gateway.rest.SqlGatewayRestEndpoint [] - Rest endpoint listening at localhost:44922 2023-02-25 14:43:19,715 INFO org.apache.flink.table.client.SqlClient [] - Start embedded gateway on port 44922 2023-02-25 14:43:20,040 INFO org.apache.flink.table.gateway.rest.SqlGatewayRestEndpoint [] - Shutting down rest endpoint. 2023-02-25 14:43:20,088 INFO org.apache.flink.table.gateway.rest.SqlGatewayRestEndpoint [] - Shut down complete. 2023-02-25 14:43:20,089 ERROR org.apache.flink.table.client.SqlClient [] - SQL Client must stop. org.apache.flink.table.client.SqlClientException: Failed to create the executor. at org.apache.flink.table.client.gateway.ExecutorImpl.(ExecutorImpl.java:170) ~[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.table.client.gateway.ExecutorImpl.(ExecutorImpl.java:113) ~[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.table.client.gateway.Executor.create(Executor.java:34) ~[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.table.client.SqlClient.start(SqlClient.java:110) ~[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:228) [flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.table.client.SqlClient.main(SqlClient.java:179) [flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Failed to get response. at org.apache.flink.table.client.gateway.ExecutorImpl.getResponse(ExecutorImpl.java:427) ~[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.table.client.gateway.ExecutorImpl.getResponse(ExecutorImpl.java:416) ~[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.table.client.gateway.ExecutorImpl.negotiateVersion(ExecutorImpl.java:447) ~[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.table.client.gateway.ExecutorImpl.(ExecutorImpl.java:132) ~[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] ... 5 more Caused by: org.apache.flink.shaded.netty4.io.netty.handler.codec.DecoderException: org.apache.flink.shaded.netty4.io.netty.handler.ssl.NotSslRecordException: not an SSL/TLS record: 485454502f312e3120343034204e6f7420466f756e640d0a636f6e74656e742d747970653a206170706c69636174696f6e2f6a736f6e3b20636861727365743d5554462d380d0a6163636573732d636f6e74726f6c2d616c6c6f772d6f726967696e3a202a0d0a636f6e74656e742d6c656e6774683a2033380d0a0d0a7b226572726f7273223a5b224e6f7420666f756e643a202f6261642d72657175657374225d7d at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:489) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:280) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT] at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
[GitHub] [flink] flinkbot commented on pull request #22021: [Hotfix] typo hotfix in TestingDispatcher
flinkbot commented on PR #22021: URL: https://github.com/apache/flink/pull/22021#issuecomment-1445009374 ## CI report: * 4202649dba573712f33c552efb26505abefb337d UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xuzifu666 commented on pull request #22021: [Hotfix] typo hotfix in TestingDispatcher
xuzifu666 commented on PR #22021: URL: https://github.com/apache/flink/pull/22021#issuecomment-1445008817 @zentol the method never be used,hotfix the TestingDispatcher with remove it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xuzifu666 opened a new pull request, #22021: [Hotfix] typo hotfix in TestingDispatcher
xuzifu666 opened a new pull request, #22021: URL: https://github.com/apache/flink/pull/22021 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] gaborgsomogyi commented on pull request #22009: [FLINK-31109][yarn] Support Hadoop proxy user when delegation token f…
gaborgsomogyi commented on PR #22009: URL: https://github.com/apache/flink/pull/22009#issuecomment-1445007082 @MartijnVisser everything is green so good to go from.my side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22020: [FLINK-31222] Remove usage of deprecated ConverterUtils.toApplicationId.
flinkbot commented on PR #22020: URL: https://github.com/apache/flink/pull/22020#issuecomment-1444987349 ## CI report: * 4b7d8dcf55db34003ac5296887facfc6e5a8ce48 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31222) Remove usage of deprecated ConverterUtils.toApplicationId
[ https://issues.apache.org/jira/browse/FLINK-31222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31222: --- Labels: pull-request-available (was: ) > Remove usage of deprecated ConverterUtils.toApplicationId > - > > Key: FLINK-31222 > URL: https://issues.apache.org/jira/browse/FLINK-31222 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.17.1 >Reporter: Shilun Fan >Priority: Major > Labels: pull-request-available > > When reading the code, I found that we use ConverterUtils.toApplicationId to > convert applicationId, this method is deprecated, we should use > ApplicationId.fromString -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] slfan1989 opened a new pull request, #22020: [FLINK-31222] Remove usage of deprecated ConverterUtils.toApplicationId.
slfan1989 opened a new pull request, #22020: URL: https://github.com/apache/flink/pull/22020 ## What is the purpose of the change JIRA: FLINK-31222. Remove usage of deprecated ConverterUtils.toApplicationId. When reading the code, I found that we use ConverterUtils.toApplicationId to convert applicationId, this method is deprecated, we should use ApplicationId.fromString ## Brief change log Remove usage of deprecated ConverterUtils.toApplicationId. ## Verifying this change Manually verified. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-31222) Remove usage of deprecated ConverterUtils.toApplicationId
Shilun Fan created FLINK-31222: -- Summary: Remove usage of deprecated ConverterUtils.toApplicationId Key: FLINK-31222 URL: https://issues.apache.org/jira/browse/FLINK-31222 Project: Flink Issue Type: Improvement Components: Deployment / YARN Affects Versions: 1.17.1 Reporter: Shilun Fan When reading the code, I found that we use ConverterUtils.toApplicationId to convert applicationId, this method is deprecated, we should use ApplicationId.fromString -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] JunRuiLee commented on pull request #21943: [FLINK-31077][runtime] Mark pending checkpoint onCompletionPromise complete only after the completed checkpoint is added to the store.
JunRuiLee commented on PR #21943: URL: https://github.com/apache/flink/pull/21943#issuecomment-1444984134 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31212) Data lost on interval left join with window group
[ https://issues.apache.org/jira/browse/FLINK-31212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lyn Zhang updated FLINK-31212: -- Summary: Data lost on interval left join with window group (was: Data lost if window group after interval left join) > Data lost on interval left join with window group > - > > Key: FLINK-31212 > URL: https://issues.apache.org/jira/browse/FLINK-31212 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.16.1 >Reporter: Lyn Zhang >Priority: Major > Labels: pull-request-available > Attachments: image-2023-02-24-17-58-44-461.png, > image-2023-02-24-17-58-57-238.png, image-2023-02-24-17-59-25-179.png, > image-2023-02-24-18-00-52-891.png, test.sql > > > > I have a case in [^test.sql] that records in table_1 left join fail will be > discard by group window. > I check the interval join operator implements. If one record in left table > join right table fail, the record will not be emitted realtime but emitted > waiting for half join bound time. In the test.sql, table_1 left join table_2 > in 5 minute bound, and the output will delay 2.5 minute this will cause > window discard the records. > h2. testing > h4. input: > !image-2023-02-24-17-58-44-461.png! > {"n":"n1","ts":"2023-02-24 14:00:00"}\{"n":"n2","ts":"2023-02-24 > 14:00:00"}{"n":"n1","ts":"2023-02-24 14:06:01"} > !image-2023-02-24-17-58-57-238.png! > {"n":"n1","ts":"2023-02-24 14:00:00","v":111}\{"n":"n1","ts":"2023-02-24 > 14:06:01","v":111} > h4. output: > expect: > !image-2023-02-24-18-00-52-891.png! > real: > !image-2023-02-24-17-59-25-179.png! > I remove this logic in [https://github.com/apache/flink/pull/22014] Please > help to review this PR. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] 1996fanrui commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring
1996fanrui commented on code in PR #21999: URL: https://github.com/apache/flink/pull/21999#discussion_r1117858061 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java: ## @@ -362,30 +358,35 @@ private void testSyncSavepointWithEndInput( "savepointResult"); harness.processAll(); -Assert.assertEquals(expectEndInput, TestBoundedOneInputStreamOperator.isInputEnded()); + assertThat(TestBoundedOneInputStreamOperator.isInputEnded()).isEqualTo(expectEndInput); } @Test -public void testCleanUpExceptionSuppressing() throws Exception { +void testCleanUpExceptionSuppressing() throws Exception { try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO) .addInput(STRING_TYPE_INFO) .setupOutputForSingletonOperatorChain(new FailingTwiceOperator()) .build()) { -try { -testHarness.processElement(new StreamRecord<>("Doesn't matter", 0)); -throw new RuntimeException("Expected an exception but ran successfully"); -} catch (Exception ex) { -ExceptionUtils.assertThrowable(ex, ExpectedTestException.class); -} +assertThatThrownBy( +() -> +testHarness.processElement( +new StreamRecord<>("Doesn't matter", 0))) +.satisfies( +(Consumer) +throwable -> +ExceptionUtils.assertThrowable( +throwable, ExpectedTestException.class)); -try { -testHarness.finishProcessing(); -} catch (Exception ex) { -// todo: checking for suppression if there are more exceptions during cleanup -ExceptionUtils.assertThrowable(ex, FailingTwiceOperator.CloseException.class); -} +// todo: checking for suppression if there are more exceptions during cleanup +assertThatThrownBy(testHarness::finishProcessing) +.satisfies( +(ThrowingConsumer) +throwable -> +ExceptionUtils.assertThrowable( +throwable, + FailingTwiceOperator.CloseException.class)); Review Comment: ```suggestion .satisfies(anyCauseMatches(FailingTwiceOperator.CloseException.class)); ``` Hi @RocMarshal , thanks for your clarification, I got it. Hi @reswqa , I found `FlinkAssertions.anyCauseMatches` is similar with `ExceptionUtils.assertThrowable`, and it can find the throwable from more deeper level, it's simpler than `ExceptionUtils.assertThrowable`. What do you think? And please take a look these comments[1][2], they are similar with this comment. [1] https://github.com/apache/flink/pull/21999#discussion_r1117222444 [2] https://github.com/apache/flink/pull/21999#discussion_r1117225403 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] 1996fanrui commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring
1996fanrui commented on code in PR #21999: URL: https://github.com/apache/flink/pull/21999#discussion_r1117858061 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java: ## @@ -362,30 +358,35 @@ private void testSyncSavepointWithEndInput( "savepointResult"); harness.processAll(); -Assert.assertEquals(expectEndInput, TestBoundedOneInputStreamOperator.isInputEnded()); + assertThat(TestBoundedOneInputStreamOperator.isInputEnded()).isEqualTo(expectEndInput); } @Test -public void testCleanUpExceptionSuppressing() throws Exception { +void testCleanUpExceptionSuppressing() throws Exception { try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO) .addInput(STRING_TYPE_INFO) .setupOutputForSingletonOperatorChain(new FailingTwiceOperator()) .build()) { -try { -testHarness.processElement(new StreamRecord<>("Doesn't matter", 0)); -throw new RuntimeException("Expected an exception but ran successfully"); -} catch (Exception ex) { -ExceptionUtils.assertThrowable(ex, ExpectedTestException.class); -} +assertThatThrownBy( +() -> +testHarness.processElement( +new StreamRecord<>("Doesn't matter", 0))) +.satisfies( +(Consumer) +throwable -> +ExceptionUtils.assertThrowable( +throwable, ExpectedTestException.class)); -try { -testHarness.finishProcessing(); -} catch (Exception ex) { -// todo: checking for suppression if there are more exceptions during cleanup -ExceptionUtils.assertThrowable(ex, FailingTwiceOperator.CloseException.class); -} +// todo: checking for suppression if there are more exceptions during cleanup +assertThatThrownBy(testHarness::finishProcessing) +.satisfies( +(ThrowingConsumer) +throwable -> +ExceptionUtils.assertThrowable( +throwable, + FailingTwiceOperator.CloseException.class)); Review Comment: Hi @RocMarshal , thanks for your clarification, I got it. Hi @reswqa , I found `FlinkAssertions.anyCauseMatches` is similar with `ExceptionUtils.assertThrowable`, and it can find the throwable from more deeper level, and it's better than `ExceptionUtils.assertThrowable`. What do you think? ```suggestion .satisfies(anyCauseMatches(FailingTwiceOperator.CloseException.class)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] 1996fanrui commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring
1996fanrui commented on code in PR #21999: URL: https://github.com/apache/flink/pull/21999#discussion_r1117858061 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java: ## @@ -362,30 +358,35 @@ private void testSyncSavepointWithEndInput( "savepointResult"); harness.processAll(); -Assert.assertEquals(expectEndInput, TestBoundedOneInputStreamOperator.isInputEnded()); + assertThat(TestBoundedOneInputStreamOperator.isInputEnded()).isEqualTo(expectEndInput); } @Test -public void testCleanUpExceptionSuppressing() throws Exception { +void testCleanUpExceptionSuppressing() throws Exception { try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO) .addInput(STRING_TYPE_INFO) .setupOutputForSingletonOperatorChain(new FailingTwiceOperator()) .build()) { -try { -testHarness.processElement(new StreamRecord<>("Doesn't matter", 0)); -throw new RuntimeException("Expected an exception but ran successfully"); -} catch (Exception ex) { -ExceptionUtils.assertThrowable(ex, ExpectedTestException.class); -} +assertThatThrownBy( +() -> +testHarness.processElement( +new StreamRecord<>("Doesn't matter", 0))) +.satisfies( +(Consumer) +throwable -> +ExceptionUtils.assertThrowable( +throwable, ExpectedTestException.class)); -try { -testHarness.finishProcessing(); -} catch (Exception ex) { -// todo: checking for suppression if there are more exceptions during cleanup -ExceptionUtils.assertThrowable(ex, FailingTwiceOperator.CloseException.class); -} +// todo: checking for suppression if there are more exceptions during cleanup +assertThatThrownBy(testHarness::finishProcessing) +.satisfies( +(ThrowingConsumer) +throwable -> +ExceptionUtils.assertThrowable( +throwable, + FailingTwiceOperator.CloseException.class)); Review Comment: Hi @RocMarshal , thanks for your clarification, I got it. Hi @reswqa , I found `FlinkAssertions.anyCauseMatches` is similar with `ExceptionUtils.assertThrowable`, and it can find the throwable from more deeper level, and it's better than `ExceptionUtils.assertThrowable`. What do you think? ```suggestion .satisfies( (ThrowingConsumer) throwable -> ExceptionUtils.assertThrowable( throwable, FailingTwiceOperator.CloseException.class)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] 1996fanrui commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring
1996fanrui commented on code in PR #21999: URL: https://github.com/apache/flink/pull/21999#discussion_r1117222444 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java: ## @@ -362,30 +358,35 @@ private void testSyncSavepointWithEndInput( "savepointResult"); harness.processAll(); -Assert.assertEquals(expectEndInput, TestBoundedOneInputStreamOperator.isInputEnded()); + assertThat(TestBoundedOneInputStreamOperator.isInputEnded()).isEqualTo(expectEndInput); } @Test -public void testCleanUpExceptionSuppressing() throws Exception { +void testCleanUpExceptionSuppressing() throws Exception { try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO) .addInput(STRING_TYPE_INFO) .setupOutputForSingletonOperatorChain(new FailingTwiceOperator()) .build()) { -try { -testHarness.processElement(new StreamRecord<>("Doesn't matter", 0)); -throw new RuntimeException("Expected an exception but ran successfully"); -} catch (Exception ex) { -ExceptionUtils.assertThrowable(ex, ExpectedTestException.class); -} +assertThatThrownBy( +() -> +testHarness.processElement( +new StreamRecord<>("Doesn't matter", 0))) +.satisfies( +(Consumer) +throwable -> +ExceptionUtils.assertThrowable( +throwable, ExpectedTestException.class)); Review Comment: ```suggestion .satisfies(anyCauseMatches(ExpectedTestException.class)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] RocMarshal commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring
RocMarshal commented on code in PR #21999: URL: https://github.com/apache/flink/pull/21999#discussion_r1117840988 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java: ## @@ -362,30 +358,35 @@ private void testSyncSavepointWithEndInput( "savepointResult"); harness.processAll(); -Assert.assertEquals(expectEndInput, TestBoundedOneInputStreamOperator.isInputEnded()); + assertThat(TestBoundedOneInputStreamOperator.isInputEnded()).isEqualTo(expectEndInput); } @Test -public void testCleanUpExceptionSuppressing() throws Exception { +void testCleanUpExceptionSuppressing() throws Exception { try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO) .addInput(STRING_TYPE_INFO) .setupOutputForSingletonOperatorChain(new FailingTwiceOperator()) .build()) { -try { -testHarness.processElement(new StreamRecord<>("Doesn't matter", 0)); -throw new RuntimeException("Expected an exception but ran successfully"); -} catch (Exception ex) { -ExceptionUtils.assertThrowable(ex, ExpectedTestException.class); -} +assertThatThrownBy( +() -> +testHarness.processElement( +new StreamRecord<>("Doesn't matter", 0))) +.satisfies( +(Consumer) +throwable -> +ExceptionUtils.assertThrowable( +throwable, ExpectedTestException.class)); -try { -testHarness.finishProcessing(); -} catch (Exception ex) { -// todo: checking for suppression if there are more exceptions during cleanup -ExceptionUtils.assertThrowable(ex, FailingTwiceOperator.CloseException.class); -} +// todo: checking for suppression if there are more exceptions during cleanup +assertThatThrownBy(testHarness::finishProcessing) +.satisfies( +(ThrowingConsumer) +throwable -> +ExceptionUtils.assertThrowable( +throwable, + FailingTwiceOperator.CloseException.class)); Review Comment: Yes, Thanks for reply. the line is also my original migration line. @1996fanrui @reswqa described the reason why we still need the `ExceptionUtils.assertThrowable` at https://github.com/apache/flink/pull/21999#discussion_r1116493534 due to the depth of the exception. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] RocMarshal commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring
RocMarshal commented on code in PR #21999: URL: https://github.com/apache/flink/pull/21999#discussion_r1117840988 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java: ## @@ -362,30 +358,35 @@ private void testSyncSavepointWithEndInput( "savepointResult"); harness.processAll(); -Assert.assertEquals(expectEndInput, TestBoundedOneInputStreamOperator.isInputEnded()); + assertThat(TestBoundedOneInputStreamOperator.isInputEnded()).isEqualTo(expectEndInput); } @Test -public void testCleanUpExceptionSuppressing() throws Exception { +void testCleanUpExceptionSuppressing() throws Exception { try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO) .addInput(STRING_TYPE_INFO) .setupOutputForSingletonOperatorChain(new FailingTwiceOperator()) .build()) { -try { -testHarness.processElement(new StreamRecord<>("Doesn't matter", 0)); -throw new RuntimeException("Expected an exception but ran successfully"); -} catch (Exception ex) { -ExceptionUtils.assertThrowable(ex, ExpectedTestException.class); -} +assertThatThrownBy( +() -> +testHarness.processElement( +new StreamRecord<>("Doesn't matter", 0))) +.satisfies( +(Consumer) +throwable -> +ExceptionUtils.assertThrowable( +throwable, ExpectedTestException.class)); -try { -testHarness.finishProcessing(); -} catch (Exception ex) { -// todo: checking for suppression if there are more exceptions during cleanup -ExceptionUtils.assertThrowable(ex, FailingTwiceOperator.CloseException.class); -} +// todo: checking for suppression if there are more exceptions during cleanup +assertThatThrownBy(testHarness::finishProcessing) +.satisfies( +(ThrowingConsumer) +throwable -> +ExceptionUtils.assertThrowable( +throwable, + FailingTwiceOperator.CloseException.class)); Review Comment: Yes, the line is also my original migration line. @1996fanrui @reswqa described the reason why we still need the `ExceptionUtils.assertThrowable` at https://github.com/apache/flink/pull/21999#discussion_r1116493534 due to the depth of the exception. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on pull request #22017: [BP-1.17][FLINK-31142][Table SQL/Client] Catch TokenMgrError in SqlCommandParserImpl#scan
snuyanzin commented on PR #22017: URL: https://github.com/apache/flink/pull/22017#issuecomment-1444782448 @reswqa this is just a backport to 1.17. Since you reviewed it at https://github.com/apache/flink/pull/22007, could you please have a look here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #21993: [FLINK-31166][table] Fix array_contains does not support null argumen…
snuyanzin commented on code in PR #21993: URL: https://github.com/apache/flink/pull/21993#discussion_r1117821456 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayElementArgumentTypeStrategy.java: ## @@ -31,24 +31,28 @@ import java.util.Optional; -import static org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsImplicitCast; - /** Specific {@link ArgumentTypeStrategy} for {@link BuiltInFunctionDefinitions#ARRAY_CONTAINS}. */ @Internal class ArrayElementArgumentTypeStrategy implements ArgumentTypeStrategy { +private final boolean preserveNullability; + +public ArrayElementArgumentTypeStrategy(boolean preserveNullability) { +this.preserveNullability = preserveNullability; +} + @Override public Optional inferArgumentType( CallContext callContext, int argumentPos, boolean throwOnFailure) { final ArrayType haystackType = (ArrayType) callContext.getArgumentDataTypes().get(0).getLogicalType(); final LogicalType haystackElementType = haystackType.getElementType(); -final LogicalType needleType = - callContext.getArgumentDataTypes().get(argumentPos).getLogicalType(); -if (supportsImplicitCast(needleType, haystackElementType)) { Review Comment: Based on the Contract for `org.apache.flink.table.types.inference.ArgumentTypeStrategy#inferArgumentType` it should return an empty value in case input type could not be inferred. Right not this change violates this contract https://github.com/apache/flink/blob/464ded1c2a0497255b70f711167c3b7ae52ea0f7/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/ArgumentTypeStrategy.java#L33-L48 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31142) Some queries lead to abrupt sql client close
[ https://issues.apache.org/jira/browse/FLINK-31142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693366#comment-17693366 ] Sergey Nuyanzin commented on FLINK-31142: - Merged to master: 464ded1c2a0497255b70f711167c3b7ae52ea0f7 > Some queries lead to abrupt sql client close > > > Key: FLINK-31142 > URL: https://issues.apache.org/jira/browse/FLINK-31142 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.17.0 >Reporter: Sergey Nuyanzin >Priority: Blocker > Labels: pull-request-available > > Although the behavior has been changed in 1.17.0, I'm not sure whether it is > a blocker or not, since in both cases it is invalid query. > I put it to blocker just because of regression. > The difference in the behavior is that before 1.17.0 > a query like > {code:sql} > select /* multiline comment; > {code} > fails to execute and sql client prompts to submit another query. > In 1.17.0 it shuts down the session failing with > {noformat} > Exception in thread "main" org.apache.flink.table.client.SqlClientException: > Could not read from command line. > at > org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:205) > at > org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:168) > at > org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:113) > at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:169) > at org.apache.flink.table.client.SqlClient.start(SqlClient.java:118) > at > org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:228) > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:179) > Caused by: org.apache.flink.sql.parser.impl.TokenMgrError: Lexical error at > line 1, column 29. Encountered: after : "" > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImplTokenManager.getNextToken(FlinkSqlParserImplTokenManager.java:26752) > at > org.apache.flink.table.client.cli.parser.SqlCommandParserImpl$TokenIterator.scan(SqlCommandParserImpl.java:89) > at > org.apache.flink.table.client.cli.parser.SqlCommandParserImpl$TokenIterator.next(SqlCommandParserImpl.java:81) > at > org.apache.flink.table.client.cli.parser.SqlCommandParserImpl.checkIncompleteStatement(SqlCommandParserImpl.java:141) > at > org.apache.flink.table.client.cli.parser.SqlCommandParserImpl.getCommand(SqlCommandParserImpl.java:111) > at > org.apache.flink.table.client.cli.parser.SqlCommandParserImpl.parseStatement(SqlCommandParserImpl.java:52) > at > org.apache.flink.table.client.cli.parser.SqlMultiLineParser.parse(SqlMultiLineParser.java:82) > at > org.jline.reader.impl.LineReaderImpl.acceptLine(LineReaderImpl.java:2964) > at > org.jline.reader.impl.LineReaderImpl$1.apply(LineReaderImpl.java:3778) > at > org.jline.reader.impl.LineReaderImpl.readLine(LineReaderImpl.java:679) > at > org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:183) > ... 6 more > Shutting down the session... > done. > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #22019: [FLINK-31221] Fix Typo in YarnConfigOptions.
flinkbot commented on PR #22019: URL: https://github.com/apache/flink/pull/22019#issuecomment-1444576741 ## CI report: * 830cda0aa78f44f17c0e2e05e3f91657bb9863e7 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] slfan1989 commented on pull request #22019: [FLINK-31221] Fix Typo in YarnConfigOptions.
slfan1989 commented on PR #22019: URL: https://github.com/apache/flink/pull/22019#issuecomment-1444573260 After I found this typo problem, I checked all the codes under moudle(flink-yarn), only this typo. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31221) Fix Typo in YarnConfigOptions
[ https://issues.apache.org/jira/browse/FLINK-31221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31221: --- Labels: pull-request-available (was: ) > Fix Typo in YarnConfigOptions > - > > Key: FLINK-31221 > URL: https://issues.apache.org/jira/browse/FLINK-31221 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.17.1 >Reporter: Shilun Fan >Priority: Major > Labels: pull-request-available > > I found a typo in YarnConfigOptions, I will fix it. > willl -> will > {code:java} > public static final ConfigOption LOCALIZED_KEYTAB_PATH = > . > "Local (on NodeManager) path where kerberos keytab > file will be" > + " localized to. If " > + SHIP_LOCAL_KEYTAB.key() > + " set to " > + "true, Flink willl ship the keytab file as > a YARN local " > + "resource. In this case, the path is > relative to the local " > + "resource directory. If set to false, Flink" > + " will try to directly locate the keytab > from the path itself."); > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] slfan1989 opened a new pull request, #22019: [FLINK-31221] Fix Typo in YarnConfigOptions.
slfan1989 opened a new pull request, #22019: URL: https://github.com/apache/flink/pull/22019 ## What is the purpose of the change JIRA: FLINK-31221. Fix Typo in YarnConfigOptions. I found a typo in YarnConfigOptions, I will fix it ## Brief change log Fix Typo Issue. ## Verifying this change Manually verified. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-31221) Fix Typo in YarnConfigOptions
Shilun Fan created FLINK-31221: -- Summary: Fix Typo in YarnConfigOptions Key: FLINK-31221 URL: https://issues.apache.org/jira/browse/FLINK-31221 Project: Flink Issue Type: Improvement Components: Deployment / YARN Affects Versions: 1.17.1 Reporter: Shilun Fan I found a typo in YarnConfigOptions, I will fix it. willl -> will {code:java} public static final ConfigOption LOCALIZED_KEYTAB_PATH = . "Local (on NodeManager) path where kerberos keytab file will be" + " localized to. If " + SHIP_LOCAL_KEYTAB.key() + " set to " + "true, Flink willl ship the keytab file as a YARN local " + "resource. In this case, the path is relative to the local " + "resource directory. If set to false, Flink" + " will try to directly locate the keytab from the path itself."); {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31216) Update kryo to current
[ https://issues.apache.org/jira/browse/FLINK-31216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Morey Straus updated FLINK-31216: - Description: kryo 2.24 is several years out of date and has a [deserialization vulnerability|https://github.com/EsotericSoftware/kryo/issues/942] associated with it. Please update to current. (was: kryo 2.24 is several years out of date and has a deserialization vulnerability associated with it. Please update to current.) > Update kryo to current > -- > > Key: FLINK-31216 > URL: https://issues.apache.org/jira/browse/FLINK-31216 > Project: Flink > Issue Type: Technical Debt >Reporter: Morey Straus >Priority: Major > Labels: security > > kryo 2.24 is several years out of date and has a [deserialization > vulnerability|https://github.com/EsotericSoftware/kryo/issues/942] associated > with it. Please update to current. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-22793) HybridSource Table Implementation
[ https://issues.apache.org/jira/browse/FLINK-22793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693339#comment-17693339 ] Thomas Weise commented on FLINK-22793: -- [~nicholasjiang] thanks for the confirmation! > HybridSource Table Implementation > - > > Key: FLINK-22793 > URL: https://issues.apache.org/jira/browse/FLINK-22793 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Common >Reporter: Nicholas Jiang >Assignee: Ran Tao >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-22793) HybridSource Table Implementation
[ https://issues.apache.org/jira/browse/FLINK-22793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise reassigned FLINK-22793: Assignee: Ran Tao (was: Nicholas Jiang) > HybridSource Table Implementation > - > > Key: FLINK-22793 > URL: https://issues.apache.org/jira/browse/FLINK-22793 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Common >Reporter: Nicholas Jiang >Assignee: Ran Tao >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-22793) HybridSource Table Implementation
[ https://issues.apache.org/jira/browse/FLINK-22793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693322#comment-17693322 ] Nicholas Jiang commented on FLINK-22793: [~thw] , of course could assign to [~lemonjing]. > HybridSource Table Implementation > - > > Key: FLINK-22793 > URL: https://issues.apache.org/jira/browse/FLINK-22793 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Common >Reporter: Nicholas Jiang >Assignee: Nicholas Jiang >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31040) Looping pattern notFollowedBy at end missing an element
[ https://issues.apache.org/jira/browse/FLINK-31040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693321#comment-17693321 ] Nicholas Jiang commented on FLINK-31040: [~Juntao Hu], [~martijnvisser], thanks for the reporter. Looping pattern notFollowedBy at end indeed misses an element. Could you like to fix the missing? > Looping pattern notFollowedBy at end missing an element > --- > > Key: FLINK-31040 > URL: https://issues.apache.org/jira/browse/FLINK-31040 > Project: Flink > Issue Type: Bug > Components: Library / CEP >Affects Versions: 1.17.0, 1.15.3, 1.16.1 >Reporter: Juntao Hu >Priority: Major > > Pattern: begin("A", > SKIP_TO_NEXT).oneOrMore().consecutive().notFollowedBy("B").within(Time.milliseconds(3)) > Sequence: will produce results > [a1, a2], [a2, a3], [a3], which obviously should be [a1, a2, a3], [a2, a3, > a4], [a3, a4], [a4]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #22018: [docs][hotfix] correct label in config configuration
flinkbot commented on PR #22018: URL: https://github.com/apache/flink/pull/22018#issuecomment-1444277820 ## CI report: * 3e076d84d04b65e4a7fb6bfffe60b70643951629 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Samrat002 opened a new pull request, #22018: [docs][hotfix] correct label in config configuration
Samrat002 opened a new pull request, #22018: URL: https://github.com/apache/flink/pull/22018 ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) no - The serializers: (yes / no / don't know) no - The runtime per-record code paths (performance sensitive): (yes / no / don't know) no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) no - The S3 file system connector: (yes / no / don't know) no ## Documentation - Does this pull request introduce a new feature? (yes / no) no - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-31220) Replace Pod with PodTemplateSpec for the pod template properties
Gyula Fora created FLINK-31220: -- Summary: Replace Pod with PodTemplateSpec for the pod template properties Key: FLINK-31220 URL: https://issues.apache.org/jira/browse/FLINK-31220 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: Gyula Fora The current podtemplate fields in the CR use the Pod object for schema. This doesn't make sense as status and other fields should never be specified and they take no effect. We should replace this with PodTemplateSpec and make sure that this is not a breaking change even if users incorrectly specified status before. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31066) Release Testing: Verify FLINK-29932 Upgrade Calcite to 1.29.0
[ https://issues.apache.org/jira/browse/FLINK-31066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693275#comment-17693275 ] Matthias Pohl commented on FLINK-31066: --- I should have remembered that the backwards compatibility between minor Flink version with changed execution plan is not given [[1]|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution] [[2]|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/#table-api--sql]. 臘♂️ So, I guess, there's no point of checking this. [~Sergey Nuyanzin] do you consider it good enough to prove that the calcite update could be visualized through the Flink UI? > Release Testing: Verify FLINK-29932 Upgrade Calcite to 1.29.0 > - > > Key: FLINK-31066 > URL: https://issues.apache.org/jira/browse/FLINK-31066 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Sergey Nuyanzin >Assignee: Matthias Pohl >Priority: Major > Labels: release-testing > Attachments: SavepointReleaseTesting.java > > > In fact this is a task to check all 3 Calcite upgrade related issues (1.27.0, > 1.28.0 and 1.29.0) > Since there were added optimization for Sarg in Calcite 1.27.0 it would make > sense to check that different queries with Sarg operator are working ok. > Also would make sense to check that SQL jobs with Sarg related queries could > be restored from previous Flink version. > An example of SQL > {code:sql} > SELECT a FROM MyTable WHERE a = 1 or a = 2 or a IS NOT NULL;{code} > {code:sql} > SELECT a FROM MyTable WHERE a = 1 or a = 2 or a IS NULL; > {code} > where MyTable is for instance > {code:sql} > CREATE TABLE MyTable ( > a bigint, > b int not null, > c varchar, > d timestamp(3) > ) with (...) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler
rkhachatryan commented on code in PR #21981: URL: https://github.com/apache/flink/pull/21981#discussion_r1117292622 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateSizeEstimates.java: ## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive.allocator; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.OperatorIDPair; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.OperatorState; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.KeyedStateHandle; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toMap; + +/** Managed Keyed State size estimates used to make scheduling decisions. */ +class StateSizeEstimates { +private final Map averages; + +public StateSizeEstimates() { +this(Collections.emptyMap()); +} + +public StateSizeEstimates(Map averages) { +this.averages = averages; +} + +public Optional estimate(JobVertexID jobVertexId) { +return Optional.ofNullable(averages.get(jobVertexId)); +} + +static StateSizeEstimates empty() { +return new StateSizeEstimates(); +} + +static StateSizeEstimates fromGraph(@Nullable ExecutionGraph executionGraph) { +return Optional.ofNullable(executionGraph) +.flatMap(graph -> Optional.ofNullable(graph.getCheckpointCoordinator())) +.flatMap(coordinator -> Optional.ofNullable(coordinator.getCheckpointStore())) +.flatMap(store -> Optional.ofNullable(store.getLatestCheckpoint())) +.map( +cp -> +build( +fromCompletedCheckpoint(cp), + mapVerticesToOperators(executionGraph))) +.orElse(empty()); +} + +private static StateSizeEstimates build( +Map sizePerOperator, +Map> verticesToOperators) { +Map verticesToSizes = +verticesToOperators.entrySet().stream() +.collect( +toMap(Map.Entry::getKey, e -> size(e.getValue(), sizePerOperator))); +return new StateSizeEstimates(verticesToSizes); +} + +private static long size(Set ids, Map sizes) { +return ids.stream() +.mapToLong(key -> sizes.getOrDefault(key, 0L)) +.boxed() +.reduce(Long::sum) +.orElse(0L); +} + +private static Map> mapVerticesToOperators( +ExecutionGraph executionGraph) { +return executionGraph.getAllVertices().entrySet().stream() +.collect(toMap(Map.Entry::getKey, e -> getOperatorIDS(e.getValue(; +} + +private static Set getOperatorIDS(ExecutionJobVertex v) { +return v.getOperatorIDs().stream() +.map(OperatorIDPair::getGeneratedOperatorID) +.collect(Collectors.toSet()); +} + +private static Map fromCompletedCheckpoint(CompletedCheckpoint cp) { +Stream> states = +cp.getOperatorStates().entrySet().stream(); +Map estimates = +states.collect( +toMap(Map.Entry::getKey, e -> estimateKeyGroupStateSize(e.getValue(; +return estimates; +} + +private static long estimateKeyGroupStateSize(OperatorState state) { +Stream handles = +state.getSubtaskStates().values().stream() +.flatMap(s -> s.getManagedKeyedState().stream()); +Stream> sizeAndCount = +handles.map( +
[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler
rkhachatryan commented on code in PR #21981: URL: https://github.com/apache/flink/pull/21981#discussion_r1117292622 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateSizeEstimates.java: ## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive.allocator; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.OperatorIDPair; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.OperatorState; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.KeyedStateHandle; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toMap; + +/** Managed Keyed State size estimates used to make scheduling decisions. */ +class StateSizeEstimates { +private final Map averages; + +public StateSizeEstimates() { +this(Collections.emptyMap()); +} + +public StateSizeEstimates(Map averages) { +this.averages = averages; +} + +public Optional estimate(JobVertexID jobVertexId) { +return Optional.ofNullable(averages.get(jobVertexId)); +} + +static StateSizeEstimates empty() { +return new StateSizeEstimates(); +} + +static StateSizeEstimates fromGraph(@Nullable ExecutionGraph executionGraph) { +return Optional.ofNullable(executionGraph) +.flatMap(graph -> Optional.ofNullable(graph.getCheckpointCoordinator())) +.flatMap(coordinator -> Optional.ofNullable(coordinator.getCheckpointStore())) +.flatMap(store -> Optional.ofNullable(store.getLatestCheckpoint())) +.map( +cp -> +build( +fromCompletedCheckpoint(cp), + mapVerticesToOperators(executionGraph))) +.orElse(empty()); +} + +private static StateSizeEstimates build( +Map sizePerOperator, +Map> verticesToOperators) { +Map verticesToSizes = +verticesToOperators.entrySet().stream() +.collect( +toMap(Map.Entry::getKey, e -> size(e.getValue(), sizePerOperator))); +return new StateSizeEstimates(verticesToSizes); +} + +private static long size(Set ids, Map sizes) { +return ids.stream() +.mapToLong(key -> sizes.getOrDefault(key, 0L)) +.boxed() +.reduce(Long::sum) +.orElse(0L); +} + +private static Map> mapVerticesToOperators( +ExecutionGraph executionGraph) { +return executionGraph.getAllVertices().entrySet().stream() +.collect(toMap(Map.Entry::getKey, e -> getOperatorIDS(e.getValue(; +} + +private static Set getOperatorIDS(ExecutionJobVertex v) { +return v.getOperatorIDs().stream() +.map(OperatorIDPair::getGeneratedOperatorID) +.collect(Collectors.toSet()); +} + +private static Map fromCompletedCheckpoint(CompletedCheckpoint cp) { +Stream> states = +cp.getOperatorStates().entrySet().stream(); +Map estimates = +states.collect( +toMap(Map.Entry::getKey, e -> estimateKeyGroupStateSize(e.getValue(; +return estimates; +} + +private static long estimateKeyGroupStateSize(OperatorState state) { +Stream handles = +state.getSubtaskStates().values().stream() +.flatMap(s -> s.getManagedKeyedState().stream()); +Stream> sizeAndCount = +handles.map( +
[GitHub] [flink] twalthr commented on a diff in pull request #19623: [FLINK-27471][table] Add ARRAY_DISTINCT function
twalthr commented on code in PR #19623: URL: https://github.com/apache/flink/pull/19623#discussion_r1117290234 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayDistinctFunction.java: ## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.types.CollectionDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.util.LinkedHashSet; +import java.util.Set; + +/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_DISTINCT}. */ +@Internal +public class ArrayDistinctFunction extends BuiltInScalarFunction { +private final ArrayData.ElementGetter elementGetter; + +public ArrayDistinctFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.ARRAY_DISTINCT, context); +final DataType dataType = +((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0)) +.getElementDataType(); +elementGetter = ArrayData.createElementGetter(dataType.getLogicalType()); +} + +public @Nullable ArrayData eval(ArrayData haystack) { +try { +if (haystack == null) { +return null; +} +Set set = new LinkedHashSet<>(); +final int size = haystack.size(); +for (int pos = 0; pos < size; pos++) { +final Object element = elementGetter.getElementOrNull(haystack, pos); +set.add(element); Review Comment: I don’t have the time to find a bug, but I just wanted to point out the existence of the expression evaluator to get access to primitive functions such as CAST or EQUALS. CC @liuyongvs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] twalthr commented on a diff in pull request #19623: [FLINK-27471][table] Add ARRAY_DISTINCT function
twalthr commented on code in PR #19623: URL: https://github.com/apache/flink/pull/19623#discussion_r1117290234 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayDistinctFunction.java: ## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.types.CollectionDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.util.LinkedHashSet; +import java.util.Set; + +/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_DISTINCT}. */ +@Internal +public class ArrayDistinctFunction extends BuiltInScalarFunction { +private final ArrayData.ElementGetter elementGetter; + +public ArrayDistinctFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.ARRAY_DISTINCT, context); +final DataType dataType = +((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0)) +.getElementDataType(); +elementGetter = ArrayData.createElementGetter(dataType.getLogicalType()); +} + +public @Nullable ArrayData eval(ArrayData haystack) { +try { +if (haystack == null) { +return null; +} +Set set = new LinkedHashSet<>(); +final int size = haystack.size(); +for (int pos = 0; pos < size; pos++) { +final Object element = elementGetter.getElementOrNull(haystack, pos); +set.add(element); Review Comment: I don’t have the time to find a bug, but I just wanted to point out the existence of the expression evaluator to get access to primitive functions such as CAST or EQUALS. CC @luoyuxia -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] twalthr commented on a diff in pull request #19623: [FLINK-27471][table] Add ARRAY_DISTINCT function
twalthr commented on code in PR #19623: URL: https://github.com/apache/flink/pull/19623#discussion_r1117286705 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayDistinctFunction.java: ## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.types.CollectionDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.util.LinkedHashSet; +import java.util.Set; + +/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_DISTINCT}. */ +@Internal +public class ArrayDistinctFunction extends BuiltInScalarFunction { +private final ArrayData.ElementGetter elementGetter; + +public ArrayDistinctFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.ARRAY_DISTINCT, context); +final DataType dataType = +((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0)) +.getElementDataType(); +elementGetter = ArrayData.createElementGetter(dataType.getLogicalType()); +} + +public @Nullable ArrayData eval(ArrayData haystack) { +try { +if (haystack == null) { +return null; +} +Set set = new LinkedHashSet<>(); +final int size = haystack.size(); +for (int pos = 0; pos < size; pos++) { +final Object element = elementGetter.getElementOrNull(haystack, pos); +set.add(element); Review Comment: @snuyanzin We need to be careful with equality of `RowData`, `MapData` and others. I'm not saying that this implementation is wrong, maybe it works for most cases. But I'm also not sure whether there is a hidden bug. If you look at `BinaryRowData`, it compares binary sections. Which means that equality is not based on SQL semantics. This is actually why I added `SpecializedFunction` with a `ExpressionEvaluatorFactory`. Only with this piece it is now possible to call the SQL compliant equals function which can be used to assemble a proper distinct for all data types. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] venkata91 commented on a diff in pull request #22009: [FLINK-31109][yarn] Support Hadoop proxy user when delegation token f…
venkata91 commented on code in PR #22009: URL: https://github.com/apache/flink/pull/22009#discussion_r1117272256 ## flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProviderITCase.java: ## @@ -57,7 +58,7 @@ public void isLoginPossibleMustReturnFalseByDefault() throws IOException { UserGroupInformation userGroupInformation = mock(UserGroupInformation.class); ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation); -assertFalse(kerberosLoginProvider.isLoginPossible()); +assertFalse(kerberosLoginProvider.isLoginPossible(true)); Review Comment: Addressed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31207) Supports high order function like other engine
[ https://issues.apache.org/jira/browse/FLINK-31207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693263#comment-17693263 ] Timo Walther commented on FLINK-31207: -- [~jackylau] I would propose to make this a top-level issue. It goes beyond adding a built-in function but adds a new category of functions. > Supports high order function like other engine > -- > > Key: FLINK-31207 > URL: https://issues.apache.org/jira/browse/FLINK-31207 > Project: Flink > Issue Type: Sub-task >Reporter: jackylau >Priority: Major > > spark [https://spark.apache.org/docs/latest/api/sql/index.html#transform] > transform/transform_keys/transform_values > after calcite https://issues.apache.org/jira/browse/CALCITE-3679s upports > high order functions, we should supports many high order funcsions like > spark/presto -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31219) Add support for canary resources
Gyula Fora created FLINK-31219: -- Summary: Add support for canary resources Key: FLINK-31219 URL: https://issues.apache.org/jira/browse/FLINK-31219 Project: Flink Issue Type: New Feature Components: Kubernetes Operator Reporter: Gyula Fora Assignee: Gyula Fora While the current health probe mechanism is able to detect different types of errors like startup / informer issues it can be generally beneficial to allow a simply canary mechanism that can verify that the operator recieives updates and reconciles resources in a timely manner. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] venkata91 commented on a diff in pull request #22009: [FLINK-31109][yarn] Support Hadoop proxy user when delegation token f…
venkata91 commented on code in PR #22009: URL: https://github.com/apache/flink/pull/22009#discussion_r1117217398 ## flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java: ## @@ -70,10 +71,22 @@ public void install() throws SecurityInstallException { try { KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(securityConfig); -if (kerberosLoginProvider.isLoginPossible()) { -kerberosLoginProvider.doLogin(); +if (kerberosLoginProvider.isLoginPossible(true)) { +kerberosLoginProvider.doLogin(true); loginUser = UserGroupInformation.getLoginUser(); +if (HadoopUserUtils.isProxyUser((loginUser)) Review Comment: Just wanted to confirm that we don't need any change right now here? Rather your comment is more for future, handling proxy user support w/ non-hadoop DT like S3. I thought about this case as well. Good point! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-31218) Improve health probe to recognize informers already failed at start
Gyula Fora created FLINK-31218: -- Summary: Improve health probe to recognize informers already failed at start Key: FLINK-31218 URL: https://issues.apache.org/jira/browse/FLINK-31218 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: Gyula Fora Assignee: Gyula Fora The current healthprobe will report unhealthy if any informers cannot start. This is in contradiction with the setting that can allow the operator to start while some informers are unhealthy (and keep trying to start them) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler
rkhachatryan commented on code in PR #21981: URL: https://github.com/apache/flink/pull/21981#discussion_r1117237527 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocationsInfo.java: ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive.allocator; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.HashMap; +import java.util.Map; + +import static java.util.Collections.emptyMap; + +class AllocationsInfo { Review Comment: Yes, I agree. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] 1996fanrui commented on pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring
1996fanrui commented on PR #21999: URL: https://github.com/apache/flink/pull/21999#issuecomment-1443901443 And please squash all review commits to the first commit after addressed all comments, thanks~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dmvk commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler
dmvk commented on code in PR #21981: URL: https://github.com/apache/flink/pull/21981#discussion_r1117232460 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java: ## @@ -784,7 +788,7 @@ public ArchivedExecutionGraph getArchivedExecutionGraph( } @Override -public void goToWaitingForResources() { +public void goToWaitingForResources(ExecutionGraph executionGraph) { Review Comment: ```suggestion public void goToWaitingForResources(@Nullable ExecutionGraph executionGraph) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dmvk commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler
dmvk commented on code in PR #21981: URL: https://github.com/apache/flink/pull/21981#discussion_r1117232018 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java: ## @@ -65,17 +69,18 @@ class WaitingForResources implements State, ResourceConsumer { desiredResources, initialResourceAllocationTimeout, resourceStabilizationTimeout, -SystemClock.getInstance()); +SystemClock.getInstance(), +null); } -@VisibleForTesting WaitingForResources( Context context, Logger log, ResourceCounter desiredResources, Duration initialResourceAllocationTimeout, Duration resourceStabilizationTimeout, -Clock clock) { +Clock clock, +ExecutionGraph previousExecutionGraph) { Review Comment: ```suggestion @Nullable ExecutionGraph previousExecutionGraph) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] twalthr commented on a diff in pull request #21500: [FLINK-27995][table] Upgrade Janino version to 3.1.9
twalthr commented on code in PR #21500: URL: https://github.com/apache/flink/pull/21500#discussion_r1117229017 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala: ## @@ -1020,16 +1021,19 @@ object CodeGenUtils { } // convert internal format to target type -val externalResultTerm = if (isInternalClass(targetDataType)) { - s"($targetTypeTerm) ${internalExpr.resultTerm}" +val (externalResultTerm, resultType) = if (isInternalClass(targetDataType)) { Review Comment: can we use more meaningful names here? `resultType` does not improve code readability. `externalResultTypeTerm` is the right name here. ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala: ## @@ -922,7 +922,8 @@ object CodeGenUtils { if (targetDataType.getConversionClass.isPrimitive) { externalResultTerm } else { - s"${internalExpr.nullTerm} ? null : ($externalResultTerm)" + // Cast of null is required because of janino issue https://github.com/janino-compiler/janino/issues/188 + s"${internalExpr.nullTerm} ? (${typeTerm(targetDataType.getConversionClass)}) null : ($externalResultTerm)" Review Comment: introduce a variable `externalResultTypeTerm = typeTerm(targetDataType.getConversionClass)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dmvk commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler
dmvk commented on code in PR #21981: URL: https://github.com/apache/flink/pull/21981#discussion_r1117230371 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java: ## @@ -978,8 +983,8 @@ public CreatingExecutionGraph.AssignmentResult tryToAssignSlots( executionGraph.setInternalTaskFailuresListener( new UpdateSchedulerNgOnInternalFailuresListener(this)); -final VertexParallelism vertexParallelism = -executionGraphWithVertexParallelism.getVertexParallelism(); +final JobSchedulingPlan vertexParallelism = Review Comment: nit: rename varialbe -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] 1996fanrui commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring
1996fanrui commented on code in PR #21999: URL: https://github.com/apache/flink/pull/21999#discussion_r1117225403 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java: ## @@ -1103,35 +1143,41 @@ private void testFailToConfirmCheckpointMessage(Consumer> consu * finished. */ @Test -public void testCheckpointFailueOnClosedOperator() throws Throwable { +void testCheckpointFailueOnClosedOperator() throws Exception { ClosingOperator operator = new ClosingOperator<>(); StreamTaskMailboxTestHarnessBuilder builder = new StreamTaskMailboxTestHarnessBuilder<>( OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO) .addInput(BasicTypeInfo.INT_TYPE_INFO); try (StreamTaskMailboxTestHarness harness = builder.setupOutputForSingletonOperatorChain(operator).build()) { -// keeps the mailbox from suspending -harness.setAutoProcess(false); -harness.processElement(new StreamRecord<>(1)); - -harness.streamTask.operatorChain.finishOperators( -harness.streamTask.getActionExecutor(), StopMode.DRAIN); -harness.streamTask.operatorChain.closeAllOperators(); -assertTrue(ClosingOperator.closed.get()); - -harness.streamTask.triggerCheckpointOnBarrier( -new CheckpointMetaData(1, 0), -CheckpointOptions.forCheckpointWithDefaultLocation(), -new CheckpointMetricsBuilder()); -} catch (Exception ex) { -ExceptionUtils.assertThrowableWithMessage( -ex, "OperatorChain and Task should never be closed at this point"); +assertThatThrownBy( +() -> { +// keeps the mailbox from suspending +harness.setAutoProcess(false); +harness.processElement(new StreamRecord<>(1)); + + harness.streamTask.operatorChain.finishOperators( + harness.streamTask.getActionExecutor(), StopMode.DRAIN); + harness.streamTask.operatorChain.closeAllOperators(); + assertThat(ClosingOperator.closed.get()).isTrue(); + +harness.streamTask.triggerCheckpointOnBarrier( +new CheckpointMetaData(1, 0), + CheckpointOptions.forCheckpointWithDefaultLocation(), +new CheckpointMetricsBuilder()); +}) +.satisfies( +(Consumer) +throwable -> + ExceptionUtils.assertThrowableWithMessage( +throwable, +"OperatorChain and Task should never be closed at this point")); Review Comment: ```suggestion .hasRootCauseMessage("OperatorChain and Task should never be closed at this point"); ``` It works. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] 1996fanrui commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring
1996fanrui commented on code in PR #21999: URL: https://github.com/apache/flink/pull/21999#discussion_r1117222444 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java: ## @@ -362,30 +358,35 @@ private void testSyncSavepointWithEndInput( "savepointResult"); harness.processAll(); -Assert.assertEquals(expectEndInput, TestBoundedOneInputStreamOperator.isInputEnded()); + assertThat(TestBoundedOneInputStreamOperator.isInputEnded()).isEqualTo(expectEndInput); } @Test -public void testCleanUpExceptionSuppressing() throws Exception { +void testCleanUpExceptionSuppressing() throws Exception { try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO) .addInput(STRING_TYPE_INFO) .setupOutputForSingletonOperatorChain(new FailingTwiceOperator()) .build()) { -try { -testHarness.processElement(new StreamRecord<>("Doesn't matter", 0)); -throw new RuntimeException("Expected an exception but ran successfully"); -} catch (Exception ex) { -ExceptionUtils.assertThrowable(ex, ExpectedTestException.class); -} +assertThatThrownBy( +() -> +testHarness.processElement( +new StreamRecord<>("Doesn't matter", 0))) +.satisfies( +(Consumer) +throwable -> +ExceptionUtils.assertThrowable( +throwable, ExpectedTestException.class)); Review Comment: ```suggestion .isInstanceOf(ExpectedTestException.class); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dmvk commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler
dmvk commented on code in PR #21981: URL: https://github.com/apache/flink/pull/21981#discussion_r1117221334 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateSizeEstimates.java: ## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive.allocator; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.OperatorIDPair; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.OperatorState; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.KeyedStateHandle; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toMap; + +/** Managed Keyed State size estimates used to make scheduling decisions. */ +class StateSizeEstimates { Review Comment: sounds good -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dmvk commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler
dmvk commented on code in PR #21981: URL: https://github.com/apache/flink/pull/21981#discussion_r1117220303 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java: ## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive.allocator; + +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** Simple {@link SlotAssigner} that treats all slots and slot sharing groups equally. */ +public class DefaultSlotAssigner implements SlotAssigner { + +@Override +public Collection assignSlots( +JobInformation jobInformation, +Collection freeSlots, +VertexParallelism vertexParallelism, +AllocationsInfo previousAllocations, +StateSizeEstimates stateSizeEstimates) { +List allGroups = new ArrayList<>(); +for (SlotSharingGroup slotSharingGroup : jobInformation.getSlotSharingGroups()) { + allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, slotSharingGroup)); +} + +Iterator iterator = freeSlots.iterator(); +Collection assignments = new ArrayList<>(); +for (ExecutionSlotSharingGroup group : allGroups) { +assignments.add(new SlotAssignment(iterator.next(), group)); Review Comment: Sounds good, I'd also be fine with adding a simple precondition ``` Preconditions.checkState(freeSlots.size() > allGroups.size(), "..."); ``` ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java: ## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive.allocator; + +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** Simple {@link SlotAssigner} that treats all slots and slot sharing groups equally. */ +public class DefaultSlotAssigner implements SlotAssigner { + +@Override +public Collection assignSlots( +JobInformation jobInformation, +Collection freeSlots, +VertexParallelism vertexParallelism, +AllocationsInfo previousAllocations, +
[GitHub] [flink] 1996fanrui commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring
1996fanrui commented on code in PR #21999: URL: https://github.com/apache/flink/pull/21999#discussion_r1117218805 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java: ## @@ -362,30 +358,35 @@ private void testSyncSavepointWithEndInput( "savepointResult"); harness.processAll(); -Assert.assertEquals(expectEndInput, TestBoundedOneInputStreamOperator.isInputEnded()); + assertThat(TestBoundedOneInputStreamOperator.isInputEnded()).isEqualTo(expectEndInput); } @Test -public void testCleanUpExceptionSuppressing() throws Exception { +void testCleanUpExceptionSuppressing() throws Exception { try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO) .addInput(STRING_TYPE_INFO) .setupOutputForSingletonOperatorChain(new FailingTwiceOperator()) .build()) { -try { -testHarness.processElement(new StreamRecord<>("Doesn't matter", 0)); -throw new RuntimeException("Expected an exception but ran successfully"); -} catch (Exception ex) { -ExceptionUtils.assertThrowable(ex, ExpectedTestException.class); -} +assertThatThrownBy( +() -> +testHarness.processElement( +new StreamRecord<>("Doesn't matter", 0))) +.satisfies( +(Consumer) +throwable -> +ExceptionUtils.assertThrowable( +throwable, ExpectedTestException.class)); -try { -testHarness.finishProcessing(); -} catch (Exception ex) { -// todo: checking for suppression if there are more exceptions during cleanup -ExceptionUtils.assertThrowable(ex, FailingTwiceOperator.CloseException.class); -} +// todo: checking for suppression if there are more exceptions during cleanup +assertThatThrownBy(testHarness::finishProcessing) +.satisfies( +(ThrowingConsumer) +throwable -> +ExceptionUtils.assertThrowable( +throwable, + FailingTwiceOperator.CloseException.class)); Review Comment: ```suggestion .isInstanceOf(FailingTwiceOperator.CloseException.class); ``` I have tried it, it works. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] venkata91 commented on a diff in pull request #22009: [FLINK-31109][yarn] Support Hadoop proxy user when delegation token f…
venkata91 commented on code in PR #22009: URL: https://github.com/apache/flink/pull/22009#discussion_r1117217398 ## flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java: ## @@ -70,10 +71,22 @@ public void install() throws SecurityInstallException { try { KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(securityConfig); -if (kerberosLoginProvider.isLoginPossible()) { -kerberosLoginProvider.doLogin(); +if (kerberosLoginProvider.isLoginPossible(true)) { +kerberosLoginProvider.doLogin(true); loginUser = UserGroupInformation.getLoginUser(); +if (HadoopUserUtils.isProxyUser((loginUser)) Review Comment: Just wanted to confirm that we don't need any change right now here? Rather you comment is more for future, handling proxy user support w/ non-hadoop DT like S3. I thought about this case as well. Good point! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] venkata91 commented on a diff in pull request #22009: [FLINK-31109][yarn] Support Hadoop proxy user when delegation token f…
venkata91 commented on code in PR #22009: URL: https://github.com/apache/flink/pull/22009#discussion_r1117217398 ## flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java: ## @@ -70,10 +71,22 @@ public void install() throws SecurityInstallException { try { KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(securityConfig); -if (kerberosLoginProvider.isLoginPossible()) { -kerberosLoginProvider.doLogin(); +if (kerberosLoginProvider.isLoginPossible(true)) { +kerberosLoginProvider.doLogin(true); loginUser = UserGroupInformation.getLoginUser(); +if (HadoopUserUtils.isProxyUser((loginUser)) Review Comment: Just wanted to confirm that we no need any change right now? Rather you comment is more for future, handling proxy user support w/ non-hadoop DT like S3. I thought about this case as well. Good point! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dmvk commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler
dmvk commented on code in PR #21981: URL: https://github.com/apache/flink/pull/21981#discussion_r1117216690 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocationsInfo.java: ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive.allocator; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.HashMap; +import java.util.Map; + +import static java.util.Collections.emptyMap; + +class AllocationsInfo { Review Comment: nit: it would be great to make the naming consistent with `JobInformation`; e.g. calling the outer class `JobAllocationInformation` and the inner one `VertexAllocationInformation`. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dmvk commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler
dmvk commented on code in PR #21981: URL: https://github.com/apache/flink/pull/21981#discussion_r1117209507 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java: ## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive.allocator; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.PriorityQueue; + +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toMap; +import static org.apache.flink.runtime.scheduler.adaptive.allocator.DefaultSlotAssigner.createExecutionSlotSharingGroups; + +/** A {@link SlotAssigner} that assigns slots based on the number of local key groups. */ +@Internal +public class StateLocalitySlotAssigner implements SlotAssigner { + +private static class AllocationScore implements Comparable { + +private final String group; +private final AllocationID allocationId; + +public AllocationScore(String group, AllocationID allocationId, long score) { +this.group = group; +this.allocationId = allocationId; +this.score = score; +} + +private final long score; + +public String getGroup() { +return group; +} + +public AllocationID getAllocationId() { +return allocationId; +} + +public long getScore() { +return score; +} + +@Override +public int compareTo(StateLocalitySlotAssigner.AllocationScore other) { +int result = Long.compare(score, other.score); +if (result != 0) { +return result; +} +result = other.allocationId.compareTo(allocationId); +if (result != 0) { +return result; +} +return other.group.compareTo(group); +} +} + +@Override +public Collection assignSlots( +JobInformation jobInformation, +Collection freeSlots, +VertexParallelism vertexParallelism, +AllocationsInfo previousAllocations, +StateSizeEstimates stateSizeEstimates) { +final List allGroups = new ArrayList<>(); +for (SlotSharingGroup slotSharingGroup : jobInformation.getSlotSharingGroups()) { + allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, slotSharingGroup)); +} +final Map parallelism = getParallelism(allGroups); + +// PQ orders the pairs (allocationID, groupID) by score, decreasing +// the score is computed as the potential amount of state that would reside locally +final PriorityQueue scores = +new PriorityQueue<>(Comparator.reverseOrder()); +for (ExecutionSlotSharingGroup group : allGroups) { +calculateScore( +group, +parallelism, +jobInformation, +previousAllocations, +stateSizeEstimates) +.entrySet().stream() +.map(e -> new AllocationScore(group.getId(), e.getKey(), e.getValue())) +.forEach(scores::add); +} + +
[GitHub] [flink] dmvk commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler
dmvk commented on code in PR #21981: URL: https://github.com/apache/flink/pull/21981#discussion_r1117208355 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateSizeEstimates.java: ## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive.allocator; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.OperatorIDPair; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.OperatorState; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.KeyedStateHandle; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toMap; + +/** Managed Keyed State size estimates used to make scheduling decisions. */ +class StateSizeEstimates { +private final Map averages; + +public StateSizeEstimates() { +this(Collections.emptyMap()); +} + +public StateSizeEstimates(Map averages) { +this.averages = averages; +} + +public Optional estimate(JobVertexID jobVertexId) { +return Optional.ofNullable(averages.get(jobVertexId)); +} + +static StateSizeEstimates empty() { +return new StateSizeEstimates(); +} + +static StateSizeEstimates fromGraph(@Nullable ExecutionGraph executionGraph) { +return Optional.ofNullable(executionGraph) +.flatMap(graph -> Optional.ofNullable(graph.getCheckpointCoordinator())) +.flatMap(coordinator -> Optional.ofNullable(coordinator.getCheckpointStore())) +.flatMap(store -> Optional.ofNullable(store.getLatestCheckpoint())) +.map( +cp -> +build( +fromCompletedCheckpoint(cp), + mapVerticesToOperators(executionGraph))) +.orElse(empty()); +} + +private static StateSizeEstimates build( +Map sizePerOperator, +Map> verticesToOperators) { +Map verticesToSizes = +verticesToOperators.entrySet().stream() +.collect( +toMap(Map.Entry::getKey, e -> size(e.getValue(), sizePerOperator))); +return new StateSizeEstimates(verticesToSizes); +} + +private static long size(Set ids, Map sizes) { +return ids.stream() +.mapToLong(key -> sizes.getOrDefault(key, 0L)) +.boxed() +.reduce(Long::sum) +.orElse(0L); +} + +private static Map> mapVerticesToOperators( +ExecutionGraph executionGraph) { +return executionGraph.getAllVertices().entrySet().stream() +.collect(toMap(Map.Entry::getKey, e -> getOperatorIDS(e.getValue(; +} + +private static Set getOperatorIDS(ExecutionJobVertex v) { +return v.getOperatorIDs().stream() +.map(OperatorIDPair::getGeneratedOperatorID) +.collect(Collectors.toSet()); +} + +private static Map fromCompletedCheckpoint(CompletedCheckpoint cp) { +Stream> states = +cp.getOperatorStates().entrySet().stream(); +Map estimates = +states.collect( +toMap(Map.Entry::getKey, e -> estimateKeyGroupStateSize(e.getValue(; +return estimates; +} + +private static long estimateKeyGroupStateSize(OperatorState state) { +Stream handles = +state.getSubtaskStates().values().stream() +.flatMap(s -> s.getManagedKeyedState().stream()); +Stream> sizeAndCount = +handles.map( +h ->
[GitHub] [flink] dmvk commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler
dmvk commented on code in PR #21981: URL: https://github.com/apache/flink/pull/21981#discussion_r1117202797 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateSizeEstimates.java: ## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive.allocator; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.OperatorIDPair; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.OperatorState; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.KeyedStateHandle; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toMap; + +/** Managed Keyed State size estimates used to make scheduling decisions. */ +class StateSizeEstimates { +private final Map averages; + +public StateSizeEstimates() { +this(Collections.emptyMap()); +} + +public StateSizeEstimates(Map averages) { +this.averages = averages; +} + +public Optional estimate(JobVertexID jobVertexId) { +return Optional.ofNullable(averages.get(jobVertexId)); +} + +static StateSizeEstimates empty() { +return new StateSizeEstimates(); +} + +static StateSizeEstimates fromGraph(@Nullable ExecutionGraph executionGraph) { +return Optional.ofNullable(executionGraph) +.flatMap(graph -> Optional.ofNullable(graph.getCheckpointCoordinator())) +.flatMap(coordinator -> Optional.ofNullable(coordinator.getCheckpointStore())) +.flatMap(store -> Optional.ofNullable(store.getLatestCheckpoint())) +.map( +cp -> +build( +fromCompletedCheckpoint(cp), + mapVerticesToOperators(executionGraph))) +.orElse(empty()); +} + +private static StateSizeEstimates build( +Map sizePerOperator, +Map> verticesToOperators) { +Map verticesToSizes = +verticesToOperators.entrySet().stream() +.collect( +toMap(Map.Entry::getKey, e -> size(e.getValue(), sizePerOperator))); +return new StateSizeEstimates(verticesToSizes); +} + +private static long size(Set ids, Map sizes) { +return ids.stream() +.mapToLong(key -> sizes.getOrDefault(key, 0L)) +.boxed() +.reduce(Long::sum) +.orElse(0L); +} + +private static Map> mapVerticesToOperators( +ExecutionGraph executionGraph) { +return executionGraph.getAllVertices().entrySet().stream() +.collect(toMap(Map.Entry::getKey, e -> getOperatorIDS(e.getValue(; +} + +private static Set getOperatorIDS(ExecutionJobVertex v) { +return v.getOperatorIDs().stream() +.map(OperatorIDPair::getGeneratedOperatorID) +.collect(Collectors.toSet()); +} + +private static Map fromCompletedCheckpoint(CompletedCheckpoint cp) { +Stream> states = +cp.getOperatorStates().entrySet().stream(); +Map estimates = +states.collect( +toMap(Map.Entry::getKey, e -> estimateKeyGroupStateSize(e.getValue(; +return estimates; +} + +private static long estimateKeyGroupStateSize(OperatorState state) { +Stream handles = +state.getSubtaskStates().values().stream() +.flatMap(s -> s.getManagedKeyedState().stream()); +Stream> sizeAndCount = +handles.map( +h ->
[GitHub] [flink] dmvk commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler
dmvk commented on code in PR #21981: URL: https://github.com/apache/flink/pull/21981#discussion_r1117189073 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java: ## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive.allocator; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.PriorityQueue; + +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toMap; +import static org.apache.flink.runtime.scheduler.adaptive.allocator.DefaultSlotAssigner.createExecutionSlotSharingGroups; + +/** A {@link SlotAssigner} that assigns slots based on the number of local key groups. */ +@Internal +public class StateLocalitySlotAssigner implements SlotAssigner { + +private static class AllocationScore implements Comparable { + +private final String group; +private final AllocationID allocationId; + +public AllocationScore(String group, AllocationID allocationId, long score) { +this.group = group; +this.allocationId = allocationId; +this.score = score; +} + +private final long score; + +public String getGroup() { +return group; +} + +public AllocationID getAllocationId() { +return allocationId; +} + +public long getScore() { +return score; +} + +@Override +public int compareTo(StateLocalitySlotAssigner.AllocationScore other) { +int result = Long.compare(score, other.score); +if (result != 0) { +return result; +} +result = other.allocationId.compareTo(allocationId); +if (result != 0) { +return result; +} +return other.group.compareTo(group); +} +} + +@Override +public Collection assignSlots( +JobInformation jobInformation, +Collection freeSlots, +VertexParallelism vertexParallelism, +AllocationsInfo previousAllocations, +StateSizeEstimates stateSizeEstimates) { +final List allGroups = new ArrayList<>(); +for (SlotSharingGroup slotSharingGroup : jobInformation.getSlotSharingGroups()) { + allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, slotSharingGroup)); +} +final Map parallelism = getParallelism(allGroups); + +// PQ orders the pairs (allocationID, groupID) by score, decreasing +// the score is computed as the potential amount of state that would reside locally +final PriorityQueue scores = +new PriorityQueue<>(Comparator.reverseOrder()); +for (ExecutionSlotSharingGroup group : allGroups) { +calculateScore( +group, +parallelism, +jobInformation, +previousAllocations, +stateSizeEstimates) +.entrySet().stream() +.map(e -> new AllocationScore(group.getId(), e.getKey(), e.getValue())) +.forEach(scores::add); +} + +
[GitHub] [flink] dmvk commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler
dmvk commented on code in PR #21981: URL: https://github.com/apache/flink/pull/21981#discussion_r1117187236 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java: ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive.allocator; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment; +import org.apache.flink.runtime.scheduler.adaptive.allocator.AllocationsInfo.JobVertexAllocationInfo; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.stream.Collectors; + +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toMap; +import static org.apache.flink.runtime.scheduler.adaptive.allocator.DefaultSlotAssigner.createExecutionSlotSharingGroups; + +/** A {@link SlotAssigner} that assigns slots based on the number of local key groups. */ +@Internal +public class StateLocalitySlotAssigner implements SlotAssigner { + +private static class AllocationScore implements Comparable { + +private final String group; Review Comment: ```suggestion private final String groupId; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-web] mxm commented on a diff in pull request #613: Kubernetes Operator 1.4.0 blogpost
mxm commented on code in PR #613: URL: https://github.com/apache/flink-web/pull/613#discussion_r1117121658 ## docs/content/posts/2023-02-27-release-kubernetes-operator-1.4.0.md: ## @@ -0,0 +1,77 @@ +--- +title: "Apache Flink Kubernetes Operator 1.4.0 Release Announcement" +date: "2023-02-27T08:00:00.000Z" +authors: +- gyfora: + name: "Gyula Fora" + twitter: "GyulaFora" +- mxm: + name: "Max Michels" + twitter: "stadtlegende" +aliases: +- /news/2023/02/27/release-kubernetes-operator-1.4.0.html +--- + +We are proud to announce the latest stable release of the operator. In addition to the expected stability improvements and fixes, the 1.4.0 release introduces the first version of the long-awaited autoscaler module. + +## Flink Streaming Job Autoscaler + +A highly requested feature for Flink applications is the ability to scale the pipeline based on incoming data load and other performance metrics. While Flink has already provided some of the required building blocks, this feature has not yet been realised in the open source ecosystem. Review Comment: ```suggestion A highly requested feature for Flink applications is the ability to scale the pipeline based on incoming data load and the utilization of the dataflow. While Flink has already provided some of the required building blocks, this feature has not yet been realized in the open source ecosystem. ``` ## docs/content/posts/2023-02-27-release-kubernetes-operator-1.4.0.md: ## @@ -0,0 +1,77 @@ +--- +title: "Apache Flink Kubernetes Operator 1.4.0 Release Announcement" +date: "2023-02-27T08:00:00.000Z" +authors: +- gyfora: + name: "Gyula Fora" + twitter: "GyulaFora" +- mxm: + name: "Max Michels" Review Comment: NIT ```suggestion name: "Maximilian Michels" ``` ## docs/content/posts/2023-02-27-release-kubernetes-operator-1.4.0.md: ## @@ -0,0 +1,77 @@ +--- +title: "Apache Flink Kubernetes Operator 1.4.0 Release Announcement" +date: "2023-02-27T08:00:00.000Z" +authors: +- gyfora: + name: "Gyula Fora" + twitter: "GyulaFora" +- mxm: + name: "Max Michels" + twitter: "stadtlegende" +aliases: +- /news/2023/02/27/release-kubernetes-operator-1.4.0.html +--- + +We are proud to announce the latest stable release of the operator. In addition to the expected stability improvements and fixes, the 1.4.0 release introduces the first version of the long-awaited autoscaler module. + +## Flink Streaming Job Autoscaler + +A highly requested feature for Flink applications is the ability to scale the pipeline based on incoming data load and other performance metrics. While Flink has already provided some of the required building blocks, this feature has not yet been realised in the open source ecosystem. + +With [FLIP-271](https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling) the community set out to build such autoscaler component as part of the Kubernetes Operator subproject. The Kubernetes Operator proved to be a great place for the autoscaler module as it already contains all the necessary bits for managing and upgrading production streaming applications. + +Fast-forward to the 1.4.0 release and we now have the first fully functional autoscaler implementation in the operator, ready to be tested and used in production applications. For more, detailed information, please refer to the [Autoscaler Documentation](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.4/docs/custom-resource/autoscaler/). + +### Overview + +The autoscaler uses Flink source and operator vertex metrics to efficiently and independently scale the job vertexes of the streaming pipeline. + Review Comment: ```suggestion The used metrics include: Source metrics: - number of pending records (source only) - number of partitions (source only) - ingestion rate (source only) - processing rate - time spent processing (utilization) ``` ## docs/content/posts/2023-02-27-release-kubernetes-operator-1.4.0.md: ## @@ -0,0 +1,77 @@ +--- +title: "Apache Flink Kubernetes Operator 1.4.0 Release Announcement" +date: "2023-02-27T08:00:00.000Z" +authors: +- gyfora: + name: "Gyula Fora" + twitter: "GyulaFora" +- mxm: + name: "Max Michels" + twitter: "stadtlegende" +aliases: +- /news/2023/02/27/release-kubernetes-operator-1.4.0.html +--- + +We are proud to announce the latest stable release of the operator. In addition to the expected stability improvements and fixes, the 1.4.0 release introduces the first version of the long-awaited autoscaler module. + +## Flink Streaming Job Autoscaler + +A highly requested feature for Flink applications is the ability to scale the pipeline based on incoming data load and other performance metrics. While Flink has already provided some of the required building blocks, this feature has not yet been realised in the open source ecosystem. + +With
[jira] [Created] (FLINK-31217) Update netty to current
Morey Straus created FLINK-31217: Summary: Update netty to current Key: FLINK-31217 URL: https://issues.apache.org/jira/browse/FLINK-31217 Project: Flink Issue Type: Technical Debt Reporter: Morey Straus Netty 3.10.6 has several vulnerabilities and needs updating to the current release. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31216) Update kryo to current
Morey Straus created FLINK-31216: Summary: Update kryo to current Key: FLINK-31216 URL: https://issues.apache.org/jira/browse/FLINK-31216 Project: Flink Issue Type: Technical Debt Reporter: Morey Straus kryo 2.24 is several years out of date and has a deserialization vulnerability associated with it. Please update to current. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31215) Backpropagate processing rate limits from non-scalable bottlenecks to upstream operators
Gyula Fora created FLINK-31215: -- Summary: Backpropagate processing rate limits from non-scalable bottlenecks to upstream operators Key: FLINK-31215 URL: https://issues.apache.org/jira/browse/FLINK-31215 Project: Flink Issue Type: New Feature Components: Autoscaler, Kubernetes Operator Reporter: Gyula Fora The current algorithm scales operators based on input data rates by propagating it forward through the graph. However there are cases where a certain operators processing capacity is limited either because it has a set maxParallelism or the users excludes it from scaling (or otherwise the capacity doesnt increase with scaling). In these cases it doesn't make sense to scale upstream operators to the target data rate if the job is going to be bottlenecked by a downstream operator. But instead we should backpropagate the limit based on the non-scalable bottleneck. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-30571) Compute scale parallelism based on observed scalability
[ https://issues.apache.org/jira/browse/FLINK-30571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora reassigned FLINK-30571: -- Assignee: (was: Gyula Fora) > Compute scale parallelism based on observed scalability > > > Key: FLINK-30571 > URL: https://issues.apache.org/jira/browse/FLINK-30571 > Project: Flink > Issue Type: New Feature > Components: Autoscaler, Kubernetes Operator >Reporter: Gyula Fora >Priority: Major > > When computing target parallelism for job vertices we currently assume linear > scaling with a fixed (1) coefficient. > This assumes that in order to double the capacity we simply double the > parallelism. > While linearity already might be violated by many real time workloads this > form of strong linearity rarely holds due to the overhead of increased > network traffic, coordination etc. > As we can access past (parallelism, processingRate) information based on the > scaling history we should estimate the scalability coefficient either using a > simple or weighted linear regression. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] Samrat002 commented on a diff in pull request #21770: [FLINK-30277][python]Allow PYTHONPATH of Python Worker configurable
Samrat002 commented on code in PR #21770: URL: https://github.com/apache/flink/pull/21770#discussion_r1117029594 ## flink-python/src/main/java/org/apache/flink/python/PythonOptions.java: ## @@ -122,6 +122,19 @@ public class PythonOptions { + "optional parameter exists. The option is equivalent to the command line option " + "\"-pyreq\"."); +/** The configuration allows user to define python path for client and workers. */ +public static final ConfigOption PYTHON_PATH = +ConfigOptions.key("env.PYTHONPATH") +.stringType() +.noDefaultValue() +.withDescription( +Description.builder() +.text( +"Specify the path on the Worker Node where the Flink Python Dependencies are installed, which " ++ "gets added into the PYTHONPATH of the Python Worker. " ++ "The option is equivalent to the command line option \"-penv.PYTHONPATH\".") Review Comment: created a separate sub tasks to provide command line support [FLINK-31214](https://issues.apache.org/jira/browse/FLINK-31214). Will raise a different pr for the same . Currently removing it from description -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-31214) Add support for new command line option -py.pythonpath
Samrat Deb created FLINK-31214: -- Summary: Add support for new command line option -py.pythonpath Key: FLINK-31214 URL: https://issues.apache.org/jira/browse/FLINK-31214 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Samrat Deb Fix For: 1.17.0, 1.18.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] Samrat002 commented on a diff in pull request #21770: [FLINK-30277][python]Allow PYTHONPATH of Python Worker configurable
Samrat002 commented on code in PR #21770: URL: https://github.com/apache/flink/pull/21770#discussion_r1117029594 ## flink-python/src/main/java/org/apache/flink/python/PythonOptions.java: ## @@ -122,6 +122,19 @@ public class PythonOptions { + "optional parameter exists. The option is equivalent to the command line option " + "\"-pyreq\"."); +/** The configuration allows user to define python path for client and workers. */ +public static final ConfigOption PYTHON_PATH = +ConfigOptions.key("env.PYTHONPATH") +.stringType() +.noDefaultValue() +.withDescription( +Description.builder() +.text( +"Specify the path on the Worker Node where the Flink Python Dependencies are installed, which " ++ "gets added into the PYTHONPATH of the Python Worker. " ++ "The option is equivalent to the command line option \"-penv.PYTHONPATH\".") Review Comment: yes ! wanted to add commandline option for user. i will create a separate jira for that purpose . i will remove the description for now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] chucheng92 commented on pull request #21904: [FLINK-30935][connectors/kafka] Add Kafka serializers deserialize check when using SimpleVersionedSerializer
chucheng92 commented on PR #21904: URL: https://github.com/apache/flink/pull/21904#issuecomment-1443789397 hi @twalthr Timo, can u help me to review it? it's a small improvement. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gauravmiglanid11 commented on a diff in pull request #533: [FLINK-30920] [AutoScaler] adds exclude vertex ids for autoscaler to ignore for evaluation and scaling
gauravmiglanid11 commented on code in PR #533: URL: https://github.com/apache/flink-kubernetes-operator/pull/533#discussion_r1117117865 ## flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java: ## @@ -222,19 +223,30 @@ private Map computeScalingSummary( Map> scalingHistory) { var out = new HashMap(); +var excludeVertexIdList = conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS); evaluatedMetrics.forEach( (v, metrics) -> { -var currentParallelism = -(int) metrics.get(ScalingMetric.PARALLELISM).getCurrent(); -var newParallelism = -jobVertexScaler.computeScaleTargetParallelism( -resource, -conf, +if (excludeVertexIdList.contains(v.toHexString())) { +LOG.debug( +"Vertex {} is part of `vertex.exclude.ids` config, Ignoring it for scaling", +v); Review Comment: got the issue, thanks, fixing it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] mxm commented on a diff in pull request #533: [FLINK-30920] [AutoScaler] adds exclude vertex ids for autoscaler to ignore for evaluation and scaling
mxm commented on code in PR #533: URL: https://github.com/apache/flink-kubernetes-operator/pull/533#discussion_r1117086633 ## flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java: ## @@ -222,19 +223,30 @@ private Map computeScalingSummary( Map> scalingHistory) { var out = new HashMap(); +var excludeVertexIdList = conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS); evaluatedMetrics.forEach( (v, metrics) -> { -var currentParallelism = -(int) metrics.get(ScalingMetric.PARALLELISM).getCurrent(); -var newParallelism = -jobVertexScaler.computeScaleTargetParallelism( -resource, -conf, +if (excludeVertexIdList.contains(v.toHexString())) { +LOG.debug( +"Vertex {} is part of `vertex.exclude.ids` config, Ignoring it for scaling", +v); Review Comment: Can we make this info level? ## flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java: ## @@ -183,6 +184,29 @@ op1, evaluated(1, 70, 100), assertFalse(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated, scalingSummary)); } +@Test +public void testVertexesExclusionForScaling() { +var sourceHexString = "0bfd135746ac8efb3cce668b12e16d3a"; +var source = JobVertexID.fromHexString(sourceHexString); +var filterOperatorHexString = "869fb403873411306404e9f2e4438c0e"; +var filterOperator = JobVertexID.fromHexString(filterOperatorHexString); +var sinkHexString = "a6b7102b8d3e3a9564998c1ffeb5e2b7"; +var sink = JobVertexID.fromHexString(sinkHexString); + +var scalingInfo = new AutoScalerInfo(new HashMap<>()); +var metrics = +Map.of( +source, +evaluated(1, 70, 100), +filterOperator, +evaluated(1, 100, 80), +sink, +evaluated(1, 70, 80)); +// filter operator should not scale +conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, List.of(filterOperatorHexString)); +assertFalse(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics)); Review Comment: Can we test that it would have scaled if the configuration was not set? ## flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java: ## @@ -222,19 +223,30 @@ private Map computeScalingSummary( Map> scalingHistory) { var out = new HashMap(); +var excludeVertexIdList = conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS); evaluatedMetrics.forEach( (v, metrics) -> { -var currentParallelism = -(int) metrics.get(ScalingMetric.PARALLELISM).getCurrent(); -var newParallelism = -jobVertexScaler.computeScaleTargetParallelism( -resource, -conf, +if (excludeVertexIdList.contains(v.toHexString())) { +LOG.debug( +"Vertex {} is part of `vertex.exclude.ids` config, Ignoring it for scaling", +v); Review Comment: This still requires more work to ensure the feature works for non-sink vertices. If you ignore a non-sink vertex, the downstream operators from it might be scaled based on the target rates calculated for the ignored vertex. We need to make sure to set the target rates based on the current rate for the ingored vertex. Here: https://github.com/apache/flink-kubernetes-operator/blob/c6cd7cbb83c5257960daebe65214035a6732ad0d/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java#L91 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] RocMarshal commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring
RocMarshal commented on code in PR #21999: URL: https://github.com/apache/flink/pull/21999#discussion_r1117078618 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java: ## @@ -362,30 +358,35 @@ private void testSyncSavepointWithEndInput( "savepointResult"); harness.processAll(); -Assert.assertEquals(expectEndInput, TestBoundedOneInputStreamOperator.isInputEnded()); + assertThat(TestBoundedOneInputStreamOperator.isInputEnded()).isEqualTo(expectEndInput); } @Test -public void testCleanUpExceptionSuppressing() throws Exception { +void testCleanUpExceptionSuppressing() throws Exception { try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO) .addInput(STRING_TYPE_INFO) .setupOutputForSingletonOperatorChain(new FailingTwiceOperator()) .build()) { -try { -testHarness.processElement(new StreamRecord<>("Doesn't matter", 0)); -throw new RuntimeException("Expected an exception but ran successfully"); -} catch (Exception ex) { -ExceptionUtils.assertThrowable(ex, ExpectedTestException.class); -} +assertThatThrownBy( +() -> +testHarness.processElement( +new StreamRecord<>("Doesn't matter", 0))) +.satisfies( +(Consumer) +throwable -> +ExceptionUtils.assertThrowable( +throwable, ExpectedTestException.class)); -try { -testHarness.finishProcessing(); -} catch (Exception ex) { -// todo: checking for suppression if there are more exceptions during cleanup -ExceptionUtils.assertThrowable(ex, FailingTwiceOperator.CloseException.class); -} +// todo: checking for suppression if there are more exceptions during cleanup +assertThatThrownBy(testHarness::finishProcessing) +.satisfies( +(ThrowingConsumer) +throwable -> +ExceptionUtils.assertThrowable( +throwable, + FailingTwiceOperator.CloseException.class)); Review Comment: https://user-images.githubusercontent.com/64569824/221202347-41aeb24a-2324-46cb-9985-0a3e5595faf6.png;> So, let's keep the current lines ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31066) Release Testing: Verify FLINK-29932 Upgrade Calcite to 1.29.0
[ https://issues.apache.org/jira/browse/FLINK-31066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693216#comment-17693216 ] Matthias Pohl commented on FLINK-31066: --- I ran the sample code (once compiled with Flink release-1.16 and once compiled with release-1.17) on a Flink cluster based on the binaries of 1.16.1 and 1.17-SNAPSHOT (commit 21158c06) The only config changed I added was the following one: {code} state.backend: hashmap state.backend.type: filesystem {code} > Release Testing: Verify FLINK-29932 Upgrade Calcite to 1.29.0 > - > > Key: FLINK-31066 > URL: https://issues.apache.org/jira/browse/FLINK-31066 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Sergey Nuyanzin >Assignee: Matthias Pohl >Priority: Major > Labels: release-testing > Attachments: SavepointReleaseTesting.java > > > In fact this is a task to check all 3 Calcite upgrade related issues (1.27.0, > 1.28.0 and 1.29.0) > Since there were added optimization for Sarg in Calcite 1.27.0 it would make > sense to check that different queries with Sarg operator are working ok. > Also would make sense to check that SQL jobs with Sarg related queries could > be restored from previous Flink version. > An example of SQL > {code:sql} > SELECT a FROM MyTable WHERE a = 1 or a = 2 or a IS NOT NULL;{code} > {code:sql} > SELECT a FROM MyTable WHERE a = 1 or a = 2 or a IS NULL; > {code} > where MyTable is for instance > {code:sql} > CREATE TABLE MyTable ( > a bigint, > b int not null, > c varchar, > d timestamp(3) > ) with (...) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] snuyanzin commented on a diff in pull request #21993: [FLINK-31166][table] Fix array_contains does not support null argumen…
snuyanzin commented on code in PR #21993: URL: https://github.com/apache/flink/pull/21993#discussion_r1117070060 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java: ## @@ -71,7 +71,11 @@ public final class SpecificInputTypeStrategies { /** Argument type derived from the array element type. */ public static final ArgumentTypeStrategy ARRAY_ELEMENT_ARG = -new ArrayElementArgumentTypeStrategy(); +new ArrayElementArgumentTypeStrategy(false); + +/** Argument type derived from the array element type. But leaves nullability untouched. */ Review Comment: "preserving" means keeping same. With current implementation if an input array is an array of not null ints and input parameter for `ArrayElementArgumentTypeStrategy` constructor is `true` then it will force set to nullable. So it does different thing and not preserving -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31066) Release Testing: Verify FLINK-29932 Upgrade Calcite to 1.29.0
[ https://issues.apache.org/jira/browse/FLINK-31066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl updated FLINK-31066: -- Attachment: SavepointReleaseTesting.java > Release Testing: Verify FLINK-29932 Upgrade Calcite to 1.29.0 > - > > Key: FLINK-31066 > URL: https://issues.apache.org/jira/browse/FLINK-31066 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Sergey Nuyanzin >Assignee: Matthias Pohl >Priority: Major > Labels: release-testing > Attachments: SavepointReleaseTesting.java > > > In fact this is a task to check all 3 Calcite upgrade related issues (1.27.0, > 1.28.0 and 1.29.0) > Since there were added optimization for Sarg in Calcite 1.27.0 it would make > sense to check that different queries with Sarg operator are working ok. > Also would make sense to check that SQL jobs with Sarg related queries could > be restored from previous Flink version. > An example of SQL > {code:sql} > SELECT a FROM MyTable WHERE a = 1 or a = 2 or a IS NOT NULL;{code} > {code:sql} > SELECT a FROM MyTable WHERE a = 1 or a = 2 or a IS NULL; > {code} > where MyTable is for instance > {code:sql} > CREATE TABLE MyTable ( > a bigint, > b int not null, > c varchar, > d timestamp(3) > ) with (...) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] RocMarshal commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring
RocMarshal commented on code in PR #21999: URL: https://github.com/apache/flink/pull/21999#discussion_r1117066514 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java: ## @@ -1103,35 +1143,41 @@ private void testFailToConfirmCheckpointMessage(Consumer> consu * finished. */ @Test -public void testCheckpointFailueOnClosedOperator() throws Throwable { +void testCheckpointFailueOnClosedOperator() throws Exception { ClosingOperator operator = new ClosingOperator<>(); StreamTaskMailboxTestHarnessBuilder builder = new StreamTaskMailboxTestHarnessBuilder<>( OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO) .addInput(BasicTypeInfo.INT_TYPE_INFO); try (StreamTaskMailboxTestHarness harness = builder.setupOutputForSingletonOperatorChain(operator).build()) { -// keeps the mailbox from suspending -harness.setAutoProcess(false); -harness.processElement(new StreamRecord<>(1)); - -harness.streamTask.operatorChain.finishOperators( -harness.streamTask.getActionExecutor(), StopMode.DRAIN); -harness.streamTask.operatorChain.closeAllOperators(); -assertTrue(ClosingOperator.closed.get()); - -harness.streamTask.triggerCheckpointOnBarrier( -new CheckpointMetaData(1, 0), -CheckpointOptions.forCheckpointWithDefaultLocation(), -new CheckpointMetricsBuilder()); -} catch (Exception ex) { -ExceptionUtils.assertThrowableWithMessage( -ex, "OperatorChain and Task should never be closed at this point"); +assertThatThrownBy( +() -> { +// keeps the mailbox from suspending +harness.setAutoProcess(false); +harness.processElement(new StreamRecord<>(1)); + + harness.streamTask.operatorChain.finishOperators( + harness.streamTask.getActionExecutor(), StopMode.DRAIN); + harness.streamTask.operatorChain.closeAllOperators(); + assertThat(ClosingOperator.closed.get()).isTrue(); Review Comment: Yes, the small scope could help devs to locate the lines with expected exception. I'd like it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-31066) Release Testing: Verify FLINK-29932 Upgrade Calcite to 1.29.0
[ https://issues.apache.org/jira/browse/FLINK-31066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693212#comment-17693212 ] Matthias Pohl edited comment on FLINK-31066 at 2/24/23 2:15 PM: I attached the job to this Jira issue that I could use to generate different execution plans for 1.16.1 and 1.17-SNAPSHOT. 1.16.1: {code} [7]:Join(joinType=[InnerJoin], where=[(SEARCH(a, Sarg[100]) OR ((c = 200) AND (b = 400)))], select=[a, c, b], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) +- StreamingFileWriter +- Sink: end {code} 1.17-SNAPSHOT: {code} [7]:Join(joinType=[InnerJoin], where=[((a = 100) OR ((c = 200) AND (b = 400)))], select=[a, c, b], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) +- StreamingFileWriter +- Sink: end {code} Restoring the job from a savepoint that was generated with 1.16.1 resulted in the following error: {code} ./flink-1.17-21158c06-SNAPSHOT/bin/flink run -s /home/mapohl/research/FLINK-31066/run-1.16.1/savepoints/savepoint-ce617a-f674354c199c flink-examples-table_2.12-1.17-SNAPSHOT-SavepointReleaseTesting.jar $(pwd)/run-1.17-SNAPSHOT The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute sql at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095) at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157) Caused by: org.apache.flink.table.api.TableException: Failed to execute sql at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:938) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:883) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:989) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:765) at org.apache.flink.table.examples.java.basics.SavepointReleaseTesting.main(SavepointReleaseTesting.java:116) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ... 9 more Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'insert-into_default_catalog.default_database.sink_table'. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2212) at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:189) at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:921) ... 18 more Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at
[jira] [Commented] (FLINK-31066) Release Testing: Verify FLINK-29932 Upgrade Calcite to 1.29.0
[ https://issues.apache.org/jira/browse/FLINK-31066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693212#comment-17693212 ] Matthias Pohl commented on FLINK-31066: --- I attached the job to this Jira issue that I could use to generate different execution plans for 1.16.1 and 1.17-SNAPSHOT. 1.16.1: {code} {code} 1.17-SNAPSHOT: {code} {code} Restoring the job from a savepoint that was generated with 1.16.1 resulted in the following error: {code} ./flink-1.17-21158c06-SNAPSHOT/bin/flink run -s /home/mapohl/research/FLINK-31066/run-1.16.1/savepoints/savepoint-ce617a-f674354c199c flink-examples-table_2.12-1.17-SNAPSHOT-SavepointReleaseTesting.jar $(pwd)/run-1.17-SNAPSHOT The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute sql at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095) at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157) Caused by: org.apache.flink.table.api.TableException: Failed to execute sql at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:938) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:883) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:989) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:765) at org.apache.flink.table.examples.java.basics.SavepointReleaseTesting.main(SavepointReleaseTesting.java:116) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ... 9 more Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'insert-into_default_catalog.default_database.sink_table'. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2212) at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:189) at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:921) ... 18 more Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) at
[GitHub] [flink] RocMarshal commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring
RocMarshal commented on code in PR #21999: URL: https://github.com/apache/flink/pull/21999#discussion_r1117060917 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java: ## @@ -1182,13 +1228,13 @@ public void testThreadInvariants() throws Throwable { }); runningTask.invocationFuture.get(); -assertThat( -runningTask.streamTask.getTaskClassLoader(), is(sameInstance(taskClassLoader))); + assertThat(runningTask.streamTask.getTaskClassLoader()).isSameAs(taskClassLoader); + assertThat(runningTask.streamTask.getTaskClassLoader()).isSameAs(taskClassLoader); Review Comment: yes, nice catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] RocMarshal commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring
RocMarshal commented on code in PR #21999: URL: https://github.com/apache/flink/pull/21999#discussion_r1117058961 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java: ## @@ -1103,35 +1143,41 @@ private void testFailToConfirmCheckpointMessage(Consumer> consu * finished. */ @Test -public void testCheckpointFailueOnClosedOperator() throws Throwable { +void testCheckpointFailueOnClosedOperator() throws Exception { ClosingOperator operator = new ClosingOperator<>(); StreamTaskMailboxTestHarnessBuilder builder = new StreamTaskMailboxTestHarnessBuilder<>( OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO) .addInput(BasicTypeInfo.INT_TYPE_INFO); try (StreamTaskMailboxTestHarness harness = builder.setupOutputForSingletonOperatorChain(operator).build()) { -// keeps the mailbox from suspending -harness.setAutoProcess(false); -harness.processElement(new StreamRecord<>(1)); - -harness.streamTask.operatorChain.finishOperators( -harness.streamTask.getActionExecutor(), StopMode.DRAIN); -harness.streamTask.operatorChain.closeAllOperators(); -assertTrue(ClosingOperator.closed.get()); - -harness.streamTask.triggerCheckpointOnBarrier( -new CheckpointMetaData(1, 0), -CheckpointOptions.forCheckpointWithDefaultLocation(), -new CheckpointMetricsBuilder()); -} catch (Exception ex) { -ExceptionUtils.assertThrowableWithMessage( -ex, "OperatorChain and Task should never be closed at this point"); +assertThatThrownBy( +() -> { +// keeps the mailbox from suspending +harness.setAutoProcess(false); +harness.processElement(new StreamRecord<>(1)); + + harness.streamTask.operatorChain.finishOperators( + harness.streamTask.getActionExecutor(), StopMode.DRAIN); + harness.streamTask.operatorChain.closeAllOperators(); + assertThat(ClosingOperator.closed.get()).isTrue(); + +harness.streamTask.triggerCheckpointOnBarrier( +new CheckpointMetaData(1, 0), + CheckpointOptions.forCheckpointWithDefaultLocation(), +new CheckpointMetricsBuilder()); +}) +.satisfies( +(Consumer) +throwable -> + ExceptionUtils.assertThrowableWithMessage( +throwable, +"OperatorChain and Task should never be closed at this point")); Review Comment: I can understand what you mean. I have a try on it with failed result, So, what about keeping the current lines, please let me know what's your opinion. https://user-images.githubusercontent.com/64569824/221199128-34ed3b53-1106-4b1f-b77a-5e7124fbd20c.png;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Samrat002 commented on a diff in pull request #21770: [FLINK-30277][python]Allow PYTHONPATH of Python Worker configurable
Samrat002 commented on code in PR #21770: URL: https://github.com/apache/flink/pull/21770#discussion_r1117042392 ## docs/layouts/shortcodes/generated/execution_config_configuration.html: ## @@ -15,7 +15,7 @@ The max number of async i/o operation that the async lookup join can trigger. -table.exec.async-lookup.output-mode Batch Streaming +table.exec.async-lookup.output-mode Batch Streaming Review Comment: reverted -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-12451) [Bitwise Functions] Add BIT_XOR function supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-12451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser reassigned FLINK-12451: -- Assignee: Ran Tao > [Bitwise Functions] Add BIT_XOR function supported in Table API and SQL > --- > > Key: FLINK-12451 > URL: https://issues.apache.org/jira/browse/FLINK-12451 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Zhanchun Zhang >Assignee: Ran Tao >Priority: Major > Labels: auto-unassigned > > Bitwise XOR, returns an unsigned 64-bit integer. > eg: > SELECT 1 ^ 1; returns 0 > SELECT 1 ^ 0; returns 1 > SELECT 11 ^ 3; returns 8 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] Samrat002 commented on a diff in pull request #21770: [FLINK-30277][python]Allow PYTHONPATH of Python Worker configurable
Samrat002 commented on code in PR #21770: URL: https://github.com/apache/flink/pull/21770#discussion_r1117031077 ## flink-python/src/main/java/org/apache/flink/python/env/PythonDependencyInfo.java: ## @@ -100,13 +104,32 @@ public PythonDependencyInfo( @Nullable String requirementsCacheDir, @Nonnull Map archives, @Nonnull String pythonExec, -@Nonnull String executionMode) { +@Nullable String pythonPath) { +this( +pythonFiles, +requirementsFilePath, +requirementsCacheDir, +archives, +pythonExec, +PYTHON_EXECUTION_MODE.defaultValue(), +pythonPath); +} Review Comment: ack -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Samrat002 commented on a diff in pull request #21770: [FLINK-30277][python]Allow PYTHONPATH of Python Worker configurable
Samrat002 commented on code in PR #21770: URL: https://github.com/apache/flink/pull/21770#discussion_r1117029594 ## flink-python/src/main/java/org/apache/flink/python/PythonOptions.java: ## @@ -122,6 +122,19 @@ public class PythonOptions { + "optional parameter exists. The option is equivalent to the command line option " + "\"-pyreq\"."); +/** The configuration allows user to define python path for client and workers. */ +public static final ConfigOption PYTHON_PATH = +ConfigOptions.key("env.PYTHONPATH") +.stringType() +.noDefaultValue() +.withDescription( +Description.builder() +.text( +"Specify the path on the Worker Node where the Flink Python Dependencies are installed, which " ++ "gets added into the PYTHONPATH of the Python Worker. " ++ "The option is equivalent to the command line option \"-penv.PYTHONPATH\".") Review Comment: yes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Samrat002 commented on a diff in pull request #21770: [FLINK-30277][python]Allow PYTHONPATH of Python Worker configurable
Samrat002 commented on code in PR #21770: URL: https://github.com/apache/flink/pull/21770#discussion_r1117029166 ## docs/static/generated/rest_v1_dispatcher.yml: ## @@ -6,7 +6,7 @@ info: license: name: Apache 2.0 url: https://www.apache.org/licenses/LICENSE-2.0.html - version: v1/1.17-SNAPSHOT + version: v1/1.18-SNAPSHOT Review Comment: ack i will remove it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-31213) Aggregation merge engine supports retract inputs
Jingsong Lee created FLINK-31213: Summary: Aggregation merge engine supports retract inputs Key: FLINK-31213 URL: https://issues.apache.org/jira/browse/FLINK-31213 Project: Flink Issue Type: Improvement Components: Table Store Reporter: Jingsong Lee Assignee: Jingsong Lee Fix For: table-store-0.4.0 For sum, it can support retracts. For others, which do not support retraction (`UPDATE_BEFORE` and `DELETE`). If the user allow some functions to ignore retraction messages, the user can configure: `'fields.${field_name}.ignore-retract'='true'`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] zentol commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler
zentol commented on code in PR #21981: URL: https://github.com/apache/flink/pull/21981#discussion_r1117002036 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java: ## @@ -157,13 +165,21 @@ public Map calculateScore( .getMaxParallelism(), parallelism.get(evi.getJobVertexId()), evi.getSubtaskIndex()); +// Estimate state size per key group. For scoring, assume 1 if size estimate is 0 to +// accommodate for averaging non-zero states +Optional kgSizeMaybe = +stateSizeEstimates.estimate(evi.getJobVertexId()).map(e -> Math.max(e, 1L)); +if (!kgSizeMaybe.isPresent()) { +continue; +} Review Comment: > if we place the task on a different TM then it will have to download all its SST files (or am I missing something?) That part is clear. if incremental state is also covered (even if an operator didn't add any state to it in the latest checkpoint), then we're good. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler
rkhachatryan commented on code in PR #21981: URL: https://github.com/apache/flink/pull/21981#discussion_r1116992436 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocationsInfo.java: ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive.allocator; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.HashMap; +import java.util.Map; + +import static java.util.Collections.emptyMap; + +class AllocationsInfo { Review Comment: Implemented in 6d4971f8f646409ff09173f7dfe1d5245b95706f and 5fe039b4cb507fcfcc88906c40d86561fc9e8ed5 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] RocMarshal commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring
RocMarshal commented on code in PR #21999: URL: https://github.com/apache/flink/pull/21999#discussion_r1116991198 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java: ## @@ -945,12 +982,15 @@ public void testAsyncCheckpointingConcurrentCloseBeforeAcknowledge() throws Exce // make sure that all state handles have been discarded discardFuture.get(); -try { -mockEnvironment.getAcknowledgeCheckpointFuture().get(10L, TimeUnit.MILLISECONDS); -fail("The checkpoint should not get acknowledged."); -} catch (TimeoutException expected) { -// future should not be completed -} +assertThatThrownBy( +() -> { +// future should not be completed +mockEnvironment +.getAcknowledgeCheckpointFuture() +.get(10L, TimeUnit.MILLISECONDS); +fail("The checkpoint should not get acknowledged."); Review Comment: Yes. I'll check the same cases in the test. thx~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22017: [BP-1.17][FLINK-31142][Table SQL/Client] Catch TokenMgrError in SqlCommandParserImpl#scan
flinkbot commented on PR #22017: URL: https://github.com/apache/flink/pull/22017#issuecomment-1443653369 ## CI report: * d9a32ec908f18c5b1f6dc79f51f61e0d5f275657 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] liuyongvs commented on a diff in pull request #21993: [FLINK-31166][table] Fix array_contains does not support null argumen…
liuyongvs commented on code in PR #21993: URL: https://github.com/apache/flink/pull/21993#discussion_r1116965989 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java: ## @@ -71,7 +71,11 @@ public final class SpecificInputTypeStrategies { /** Argument type derived from the array element type. */ public static final ArgumentTypeStrategy ARRAY_ELEMENT_ARG = -new ArrayElementArgumentTypeStrategy(); +new ArrayElementArgumentTypeStrategy(false); + +/** Argument type derived from the array element type. But leaves nullability untouched. */ Review Comment: the reason why i add ARRAY_ELEMENT_ARG and ARRAY_ELEMENT_ARG_NULLABLE like COMMON_ARG/COMMON_ARG_NULLABLE is some array function may need element arg not null. so i preserve. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin opened a new pull request, #22017: [BP-1.17][FLINK-31142][Table SQL/Client] Catch TokenMgrError in SqlCommandParserImpl#scan
snuyanzin opened a new pull request, #22017: URL: https://github.com/apache/flink/pull/22017 This is 1.17 backport of https://github.com/apache/flink/pull/22007 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler
rkhachatryan commented on code in PR #21981: URL: https://github.com/apache/flink/pull/21981#discussion_r1116959052 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java: ## @@ -157,13 +165,21 @@ public Map calculateScore( .getMaxParallelism(), parallelism.get(evi.getJobVertexId()), evi.getSubtaskIndex()); +// Estimate state size per key group. For scoring, assume 1 if size estimate is 0 to +// accommodate for averaging non-zero states +Optional kgSizeMaybe = +stateSizeEstimates.estimate(evi.getJobVertexId()).map(e -> Math.max(e, 1L)); +if (!kgSizeMaybe.isPresent()) { +continue; +} Review Comment: We still need to consider this state: if we place the task on a different TM then it will have to download all its SST files (or am I missing something?) There are two methods for keyed state: 1. `handle.getStateSize()` returns the full state size 2. [`handle.getCheckpointedSize()`](https://github.com/apache/flink/blob/464ded1c2a0497255b70f711167c3b7ae52ea0f7/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeStateHandle.java#L62) returns "incremental" state size As per above, `getStateSize` is used. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22016: [FLINK-31174][doc] fix inconsistent data format between Learn-Flink Doc and flink-training-repo
flinkbot commented on PR #22016: URL: https://github.com/apache/flink/pull/22016#issuecomment-1443641876 ## CI report: * 8adeac4636d28c86512f53b09de689830fc3 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-3117) Storm Tick Tuples are not supported
[ https://issues.apache.org/jira/browse/FLINK-3117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-3117: -- Labels: pull-request-available (was: ) > Storm Tick Tuples are not supported > --- > > Key: FLINK-3117 > URL: https://issues.apache.org/jira/browse/FLINK-3117 > Project: Flink > Issue Type: Bug > Components: Legacy Components / Storm Compatibility >Affects Versions: 0.10.0, 0.10.1 >Reporter: Maximilian Michels >Priority: Major > Labels: pull-request-available > > Tick Tuples are Storm's mechanism for Windowing. Every N seconds, Storm > triggers emission of a tick tuple which can then be used as a signal to emit > or process data. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] ChenZhongPu opened a new pull request, #22016: [FLINK-3117][doc] fix inconsistent data format between Learn-Flink Doc and flink-training-repo
ChenZhongPu opened a new pull request, #22016: URL: https://github.com/apache/flink/pull/22016 Due to the previous [pr](https://github.com/apache/flink-training/pull/36) in which the data format of `TaxiRide` is changed, there is an inconsistency in [Data Pipelines & ETL](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/learn-flink/etl/). To be specific, - Since we cannot obtain `startTime` and `endTime` at the same time, I change the sample code to compute the (Euclidean) distance instead. - I also conducted the experiment using the updated code with parallelism 4 to get the new sample output. - Chinese translation is also updated. Before: ```java Interval rideInterval = new Interval(ride.startTime, ride.endTime); Minutes duration = rideInterval.toDuration().toStandardMinutes(); out.collect(new Tuple2<>(ride.startCell, duration)); ``` After: ```java double distance = ride.getEuclideanDistance(ride.startLon, ride.startLat); out.collect(new Tuple2<>(ride.startCell, distance)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin merged pull request #22007: [FLINK-31142][Table SQL/Client] Catch TokenMgrError in SqlCommandParserImpl#scan
snuyanzin merged PR #22007: URL: https://github.com/apache/flink/pull/22007 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org