[jira] [Created] (FLINK-31223) sql-client.sh fails to start with ssl enabled

2023-02-24 Thread macdoor615 (Jira)
macdoor615 created FLINK-31223:
--

 Summary: sql-client.sh fails to start with ssl enabled
 Key: FLINK-31223
 URL: https://issues.apache.org/jira/browse/FLINK-31223
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Affects Versions: 1.17.0
Reporter: macdoor615
 Fix For: 1.17.0


*Version:* 1.17-SNAPSHOT *Commit:* c66ef25 

1. ssl disabled 

sql-client.sh works properly

2. ssl enabled

web ui can access with [https://url|https://url/]

The task can be submitted correctly through sql-gateway. I can confirm that 
sql-gateway exposes the http protocol, not https.

But sql-client.sh fails to start with the following exceptions. It seems that 
sql-client.sh expect https protocol

 
{code:java}
2023-02-25 14:43:19,317 INFO  org.apache.flink.configuration.Configuration      
           [] - Config uses fallback configuration key 'rest.port' instead of 
key 'rest.bind-port'
2023-02-25 14:43:19,343 INFO  
org.apache.flink.table.gateway.rest.SqlGatewayRestEndpoint   [] - Starting rest 
endpoint.
2023-02-25 14:43:19,713 INFO  
org.apache.flink.table.gateway.rest.SqlGatewayRestEndpoint   [] - Rest endpoint 
listening at localhost:44922
2023-02-25 14:43:19,715 INFO  org.apache.flink.table.client.SqlClient           
           [] - Start embedded gateway on port 44922
2023-02-25 14:43:20,040 INFO  
org.apache.flink.table.gateway.rest.SqlGatewayRestEndpoint   [] - Shutting down 
rest endpoint.
2023-02-25 14:43:20,088 INFO  
org.apache.flink.table.gateway.rest.SqlGatewayRestEndpoint   [] - Shut down 
complete.
2023-02-25 14:43:20,089 ERROR org.apache.flink.table.client.SqlClient           
           [] - SQL Client must stop.
org.apache.flink.table.client.SqlClientException: Failed to create the executor.
        at 
org.apache.flink.table.client.gateway.ExecutorImpl.(ExecutorImpl.java:170)
 ~[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.table.client.gateway.ExecutorImpl.(ExecutorImpl.java:113)
 ~[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.table.client.gateway.Executor.create(Executor.java:34) 
~[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at org.apache.flink.table.client.SqlClient.start(SqlClient.java:110) 
~[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:228) 
[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at org.apache.flink.table.client.SqlClient.main(SqlClient.java:179) 
[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Failed 
to get response.
        at 
org.apache.flink.table.client.gateway.ExecutorImpl.getResponse(ExecutorImpl.java:427)
 ~[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.table.client.gateway.ExecutorImpl.getResponse(ExecutorImpl.java:416)
 ~[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.table.client.gateway.ExecutorImpl.negotiateVersion(ExecutorImpl.java:447)
 ~[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.table.client.gateway.ExecutorImpl.(ExecutorImpl.java:132)
 ~[flink-sql-client-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        ... 5 more
Caused by: 
org.apache.flink.shaded.netty4.io.netty.handler.codec.DecoderException: 
org.apache.flink.shaded.netty4.io.netty.handler.ssl.NotSslRecordException: not 
an SSL/TLS record: 
485454502f312e3120343034204e6f7420466f756e640d0a636f6e74656e742d747970653a206170706c69636174696f6e2f6a736f6e3b20636861727365743d5554462d380d0a6163636573732d636f6e74726f6c2d616c6c6f772d6f726967696e3a202a0d0a636f6e74656e742d6c656e6774683a2033380d0a0d0a7b226572726f7273223a5b224e6f7420666f756e643a202f6261642d72657175657374225d7d
        at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:489)
 ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:280)
 ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
        at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
 

[GitHub] [flink] flinkbot commented on pull request #22021: [Hotfix] typo hotfix in TestingDispatcher

2023-02-24 Thread via GitHub


flinkbot commented on PR #22021:
URL: https://github.com/apache/flink/pull/22021#issuecomment-1445009374

   
   ## CI report:
   
   * 4202649dba573712f33c552efb26505abefb337d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] xuzifu666 commented on pull request #22021: [Hotfix] typo hotfix in TestingDispatcher

2023-02-24 Thread via GitHub


xuzifu666 commented on PR #22021:
URL: https://github.com/apache/flink/pull/22021#issuecomment-1445008817

   @zentol  the method never be used,hotfix the TestingDispatcher with remove it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] xuzifu666 opened a new pull request, #22021: [Hotfix] typo hotfix in TestingDispatcher

2023-02-24 Thread via GitHub


xuzifu666 opened a new pull request, #22021:
URL: https://github.com/apache/flink/pull/22021

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] gaborgsomogyi commented on pull request #22009: [FLINK-31109][yarn] Support Hadoop proxy user when delegation token f…

2023-02-24 Thread via GitHub


gaborgsomogyi commented on PR #22009:
URL: https://github.com/apache/flink/pull/22009#issuecomment-1445007082

   @MartijnVisser everything is green so good to go from.my side.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22020: [FLINK-31222] Remove usage of deprecated ConverterUtils.toApplicationId.

2023-02-24 Thread via GitHub


flinkbot commented on PR #22020:
URL: https://github.com/apache/flink/pull/22020#issuecomment-1444987349

   
   ## CI report:
   
   * 4b7d8dcf55db34003ac5296887facfc6e5a8ce48 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31222) Remove usage of deprecated ConverterUtils.toApplicationId

2023-02-24 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31222:
---
Labels: pull-request-available  (was: )

> Remove usage of deprecated ConverterUtils.toApplicationId
> -
>
> Key: FLINK-31222
> URL: https://issues.apache.org/jira/browse/FLINK-31222
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.17.1
>Reporter: Shilun Fan
>Priority: Major
>  Labels: pull-request-available
>
> When reading the code, I found that we use ConverterUtils.toApplicationId to 
> convert applicationId, this method is deprecated, we should use 
> ApplicationId.fromString



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] slfan1989 opened a new pull request, #22020: [FLINK-31222] Remove usage of deprecated ConverterUtils.toApplicationId.

2023-02-24 Thread via GitHub


slfan1989 opened a new pull request, #22020:
URL: https://github.com/apache/flink/pull/22020

   
   
   ## What is the purpose of the change
   JIRA: FLINK-31222. Remove usage of deprecated ConverterUtils.toApplicationId.
   When reading the code, I found that we use ConverterUtils.toApplicationId to 
convert applicationId, this method is deprecated, we should use 
ApplicationId.fromString
   
   ## Brief change log
   
   Remove usage of deprecated ConverterUtils.toApplicationId.
   
   ## Verifying this change
   
   Manually verified.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-31222) Remove usage of deprecated ConverterUtils.toApplicationId

2023-02-24 Thread Shilun Fan (Jira)
Shilun Fan created FLINK-31222:
--

 Summary: Remove usage of deprecated ConverterUtils.toApplicationId
 Key: FLINK-31222
 URL: https://issues.apache.org/jira/browse/FLINK-31222
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / YARN
Affects Versions: 1.17.1
Reporter: Shilun Fan


When reading the code, I found that we use ConverterUtils.toApplicationId to 
convert applicationId, this method is deprecated, we should use 
ApplicationId.fromString



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] JunRuiLee commented on pull request #21943: [FLINK-31077][runtime] Mark pending checkpoint onCompletionPromise complete only after the completed checkpoint is added to the store.

2023-02-24 Thread via GitHub


JunRuiLee commented on PR #21943:
URL: https://github.com/apache/flink/pull/21943#issuecomment-1444984134

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31212) Data lost on interval left join with window group

2023-02-24 Thread Lyn Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lyn Zhang updated FLINK-31212:
--
Summary: Data lost on interval left join with window group  (was: Data lost 
if window group after interval left join)

> Data lost on interval left join with window group
> -
>
> Key: FLINK-31212
> URL: https://issues.apache.org/jira/browse/FLINK-31212
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.16.1
>Reporter: Lyn Zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2023-02-24-17-58-44-461.png, 
> image-2023-02-24-17-58-57-238.png, image-2023-02-24-17-59-25-179.png, 
> image-2023-02-24-18-00-52-891.png, test.sql
>
>
>  
> I have a case in [^test.sql] that records in table_1 left join fail will be 
> discard by group window.
> I check the interval join operator implements. If one record in left table 
> join right table fail, the record will not be emitted realtime but emitted 
> waiting for half join bound time. In the test.sql, table_1 left join table_2 
> in 5 minute bound,  and the output will delay 2.5 minute this will cause 
> window discard the records.
> h2. testing
> h4. input:
> !image-2023-02-24-17-58-44-461.png!
> {"n":"n1","ts":"2023-02-24 14:00:00"}\{"n":"n2","ts":"2023-02-24 
> 14:00:00"}{"n":"n1","ts":"2023-02-24 14:06:01"}
> !image-2023-02-24-17-58-57-238.png!
> {"n":"n1","ts":"2023-02-24 14:00:00","v":111}\{"n":"n1","ts":"2023-02-24 
> 14:06:01","v":111}
> h4. output:
> expect:
> !image-2023-02-24-18-00-52-891.png!
> real:
> !image-2023-02-24-17-59-25-179.png!
> I remove this logic in [https://github.com/apache/flink/pull/22014]  Please 
> help to review this PR.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] 1996fanrui commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring

2023-02-24 Thread via GitHub


1996fanrui commented on code in PR #21999:
URL: https://github.com/apache/flink/pull/21999#discussion_r1117858061


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java:
##
@@ -362,30 +358,35 @@ private void testSyncSavepointWithEndInput(
 "savepointResult");
 harness.processAll();
 
-Assert.assertEquals(expectEndInput, 
TestBoundedOneInputStreamOperator.isInputEnded());
+
assertThat(TestBoundedOneInputStreamOperator.isInputEnded()).isEqualTo(expectEndInput);
 }
 
 @Test
-public void testCleanUpExceptionSuppressing() throws Exception {
+void testCleanUpExceptionSuppressing() throws Exception {
 try (StreamTaskMailboxTestHarness testHarness =
 new 
StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO)
 .addInput(STRING_TYPE_INFO)
 .setupOutputForSingletonOperatorChain(new 
FailingTwiceOperator())
 .build()) {
 
-try {
-testHarness.processElement(new StreamRecord<>("Doesn't 
matter", 0));
-throw new RuntimeException("Expected an exception but ran 
successfully");
-} catch (Exception ex) {
-ExceptionUtils.assertThrowable(ex, 
ExpectedTestException.class);
-}
+assertThatThrownBy(
+() ->
+testHarness.processElement(
+new StreamRecord<>("Doesn't 
matter", 0)))
+.satisfies(
+(Consumer)
+throwable ->
+ExceptionUtils.assertThrowable(
+throwable, 
ExpectedTestException.class));
 
-try {
-testHarness.finishProcessing();
-} catch (Exception ex) {
-// todo: checking for suppression if there are more exceptions 
during cleanup
-ExceptionUtils.assertThrowable(ex, 
FailingTwiceOperator.CloseException.class);
-}
+// todo: checking for suppression if there are more exceptions 
during cleanup
+assertThatThrownBy(testHarness::finishProcessing)
+.satisfies(
+(ThrowingConsumer)
+throwable ->
+ExceptionUtils.assertThrowable(
+throwable,
+
FailingTwiceOperator.CloseException.class));

Review Comment:
   ```suggestion
   
.satisfies(anyCauseMatches(FailingTwiceOperator.CloseException.class));
   ```
   
   
   Hi @RocMarshal , thanks for your clarification, I got it.
   
   Hi @reswqa , I found `FlinkAssertions.anyCauseMatches` is similar with 
`ExceptionUtils.assertThrowable`, and it can find the throwable from more 
deeper level, it's simpler than `ExceptionUtils.assertThrowable`.
   
   What do you think? And please take a look these comments[1][2], they are 
similar with this comment.
   
   [1] https://github.com/apache/flink/pull/21999#discussion_r1117222444
   [2] https://github.com/apache/flink/pull/21999#discussion_r1117225403



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring

2023-02-24 Thread via GitHub


1996fanrui commented on code in PR #21999:
URL: https://github.com/apache/flink/pull/21999#discussion_r1117858061


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java:
##
@@ -362,30 +358,35 @@ private void testSyncSavepointWithEndInput(
 "savepointResult");
 harness.processAll();
 
-Assert.assertEquals(expectEndInput, 
TestBoundedOneInputStreamOperator.isInputEnded());
+
assertThat(TestBoundedOneInputStreamOperator.isInputEnded()).isEqualTo(expectEndInput);
 }
 
 @Test
-public void testCleanUpExceptionSuppressing() throws Exception {
+void testCleanUpExceptionSuppressing() throws Exception {
 try (StreamTaskMailboxTestHarness testHarness =
 new 
StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO)
 .addInput(STRING_TYPE_INFO)
 .setupOutputForSingletonOperatorChain(new 
FailingTwiceOperator())
 .build()) {
 
-try {
-testHarness.processElement(new StreamRecord<>("Doesn't 
matter", 0));
-throw new RuntimeException("Expected an exception but ran 
successfully");
-} catch (Exception ex) {
-ExceptionUtils.assertThrowable(ex, 
ExpectedTestException.class);
-}
+assertThatThrownBy(
+() ->
+testHarness.processElement(
+new StreamRecord<>("Doesn't 
matter", 0)))
+.satisfies(
+(Consumer)
+throwable ->
+ExceptionUtils.assertThrowable(
+throwable, 
ExpectedTestException.class));
 
-try {
-testHarness.finishProcessing();
-} catch (Exception ex) {
-// todo: checking for suppression if there are more exceptions 
during cleanup
-ExceptionUtils.assertThrowable(ex, 
FailingTwiceOperator.CloseException.class);
-}
+// todo: checking for suppression if there are more exceptions 
during cleanup
+assertThatThrownBy(testHarness::finishProcessing)
+.satisfies(
+(ThrowingConsumer)
+throwable ->
+ExceptionUtils.assertThrowable(
+throwable,
+
FailingTwiceOperator.CloseException.class));

Review Comment:
   Hi @RocMarshal , thanks for your clarification, I got it.
   
   Hi @reswqa , I found `FlinkAssertions.anyCauseMatches` is similar with 
`ExceptionUtils.assertThrowable`, and it can find the throwable from more 
deeper level, and it's better than `ExceptionUtils.assertThrowable`.
   
   What do you think?
   
   
   ```suggestion
   
.satisfies(anyCauseMatches(FailingTwiceOperator.CloseException.class));
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring

2023-02-24 Thread via GitHub


1996fanrui commented on code in PR #21999:
URL: https://github.com/apache/flink/pull/21999#discussion_r1117858061


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java:
##
@@ -362,30 +358,35 @@ private void testSyncSavepointWithEndInput(
 "savepointResult");
 harness.processAll();
 
-Assert.assertEquals(expectEndInput, 
TestBoundedOneInputStreamOperator.isInputEnded());
+
assertThat(TestBoundedOneInputStreamOperator.isInputEnded()).isEqualTo(expectEndInput);
 }
 
 @Test
-public void testCleanUpExceptionSuppressing() throws Exception {
+void testCleanUpExceptionSuppressing() throws Exception {
 try (StreamTaskMailboxTestHarness testHarness =
 new 
StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO)
 .addInput(STRING_TYPE_INFO)
 .setupOutputForSingletonOperatorChain(new 
FailingTwiceOperator())
 .build()) {
 
-try {
-testHarness.processElement(new StreamRecord<>("Doesn't 
matter", 0));
-throw new RuntimeException("Expected an exception but ran 
successfully");
-} catch (Exception ex) {
-ExceptionUtils.assertThrowable(ex, 
ExpectedTestException.class);
-}
+assertThatThrownBy(
+() ->
+testHarness.processElement(
+new StreamRecord<>("Doesn't 
matter", 0)))
+.satisfies(
+(Consumer)
+throwable ->
+ExceptionUtils.assertThrowable(
+throwable, 
ExpectedTestException.class));
 
-try {
-testHarness.finishProcessing();
-} catch (Exception ex) {
-// todo: checking for suppression if there are more exceptions 
during cleanup
-ExceptionUtils.assertThrowable(ex, 
FailingTwiceOperator.CloseException.class);
-}
+// todo: checking for suppression if there are more exceptions 
during cleanup
+assertThatThrownBy(testHarness::finishProcessing)
+.satisfies(
+(ThrowingConsumer)
+throwable ->
+ExceptionUtils.assertThrowable(
+throwable,
+
FailingTwiceOperator.CloseException.class));

Review Comment:
   Hi @RocMarshal , thanks for your clarification, I got it.
   
   Hi @reswqa , I found `FlinkAssertions.anyCauseMatches` is similar with 
`ExceptionUtils.assertThrowable`, and it can find the throwable from more 
deeper level, and it's better than `ExceptionUtils.assertThrowable`.
   
   What do you think?
   
   
   ```suggestion
   .satisfies(
   (ThrowingConsumer)
   throwable ->
   ExceptionUtils.assertThrowable(
   throwable,
   
FailingTwiceOperator.CloseException.class));
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring

2023-02-24 Thread via GitHub


1996fanrui commented on code in PR #21999:
URL: https://github.com/apache/flink/pull/21999#discussion_r1117222444


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java:
##
@@ -362,30 +358,35 @@ private void testSyncSavepointWithEndInput(
 "savepointResult");
 harness.processAll();
 
-Assert.assertEquals(expectEndInput, 
TestBoundedOneInputStreamOperator.isInputEnded());
+
assertThat(TestBoundedOneInputStreamOperator.isInputEnded()).isEqualTo(expectEndInput);
 }
 
 @Test
-public void testCleanUpExceptionSuppressing() throws Exception {
+void testCleanUpExceptionSuppressing() throws Exception {
 try (StreamTaskMailboxTestHarness testHarness =
 new 
StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO)
 .addInput(STRING_TYPE_INFO)
 .setupOutputForSingletonOperatorChain(new 
FailingTwiceOperator())
 .build()) {
 
-try {
-testHarness.processElement(new StreamRecord<>("Doesn't 
matter", 0));
-throw new RuntimeException("Expected an exception but ran 
successfully");
-} catch (Exception ex) {
-ExceptionUtils.assertThrowable(ex, 
ExpectedTestException.class);
-}
+assertThatThrownBy(
+() ->
+testHarness.processElement(
+new StreamRecord<>("Doesn't 
matter", 0)))
+.satisfies(
+(Consumer)
+throwable ->
+ExceptionUtils.assertThrowable(
+throwable, 
ExpectedTestException.class));

Review Comment:
   ```suggestion
   .satisfies(anyCauseMatches(ExpectedTestException.class));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] RocMarshal commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring

2023-02-24 Thread via GitHub


RocMarshal commented on code in PR #21999:
URL: https://github.com/apache/flink/pull/21999#discussion_r1117840988


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java:
##
@@ -362,30 +358,35 @@ private void testSyncSavepointWithEndInput(
 "savepointResult");
 harness.processAll();
 
-Assert.assertEquals(expectEndInput, 
TestBoundedOneInputStreamOperator.isInputEnded());
+
assertThat(TestBoundedOneInputStreamOperator.isInputEnded()).isEqualTo(expectEndInput);
 }
 
 @Test
-public void testCleanUpExceptionSuppressing() throws Exception {
+void testCleanUpExceptionSuppressing() throws Exception {
 try (StreamTaskMailboxTestHarness testHarness =
 new 
StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO)
 .addInput(STRING_TYPE_INFO)
 .setupOutputForSingletonOperatorChain(new 
FailingTwiceOperator())
 .build()) {
 
-try {
-testHarness.processElement(new StreamRecord<>("Doesn't 
matter", 0));
-throw new RuntimeException("Expected an exception but ran 
successfully");
-} catch (Exception ex) {
-ExceptionUtils.assertThrowable(ex, 
ExpectedTestException.class);
-}
+assertThatThrownBy(
+() ->
+testHarness.processElement(
+new StreamRecord<>("Doesn't 
matter", 0)))
+.satisfies(
+(Consumer)
+throwable ->
+ExceptionUtils.assertThrowable(
+throwable, 
ExpectedTestException.class));
 
-try {
-testHarness.finishProcessing();
-} catch (Exception ex) {
-// todo: checking for suppression if there are more exceptions 
during cleanup
-ExceptionUtils.assertThrowable(ex, 
FailingTwiceOperator.CloseException.class);
-}
+// todo: checking for suppression if there are more exceptions 
during cleanup
+assertThatThrownBy(testHarness::finishProcessing)
+.satisfies(
+(ThrowingConsumer)
+throwable ->
+ExceptionUtils.assertThrowable(
+throwable,
+
FailingTwiceOperator.CloseException.class));

Review Comment:
Yes, Thanks for reply. the line is also my original migration line.
   @1996fanrui 
   @reswqa described the reason why we still need the  
`ExceptionUtils.assertThrowable` at  
https://github.com/apache/flink/pull/21999#discussion_r1116493534 due to the 
depth of the exception. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] RocMarshal commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring

2023-02-24 Thread via GitHub


RocMarshal commented on code in PR #21999:
URL: https://github.com/apache/flink/pull/21999#discussion_r1117840988


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java:
##
@@ -362,30 +358,35 @@ private void testSyncSavepointWithEndInput(
 "savepointResult");
 harness.processAll();
 
-Assert.assertEquals(expectEndInput, 
TestBoundedOneInputStreamOperator.isInputEnded());
+
assertThat(TestBoundedOneInputStreamOperator.isInputEnded()).isEqualTo(expectEndInput);
 }
 
 @Test
-public void testCleanUpExceptionSuppressing() throws Exception {
+void testCleanUpExceptionSuppressing() throws Exception {
 try (StreamTaskMailboxTestHarness testHarness =
 new 
StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO)
 .addInput(STRING_TYPE_INFO)
 .setupOutputForSingletonOperatorChain(new 
FailingTwiceOperator())
 .build()) {
 
-try {
-testHarness.processElement(new StreamRecord<>("Doesn't 
matter", 0));
-throw new RuntimeException("Expected an exception but ran 
successfully");
-} catch (Exception ex) {
-ExceptionUtils.assertThrowable(ex, 
ExpectedTestException.class);
-}
+assertThatThrownBy(
+() ->
+testHarness.processElement(
+new StreamRecord<>("Doesn't 
matter", 0)))
+.satisfies(
+(Consumer)
+throwable ->
+ExceptionUtils.assertThrowable(
+throwable, 
ExpectedTestException.class));
 
-try {
-testHarness.finishProcessing();
-} catch (Exception ex) {
-// todo: checking for suppression if there are more exceptions 
during cleanup
-ExceptionUtils.assertThrowable(ex, 
FailingTwiceOperator.CloseException.class);
-}
+// todo: checking for suppression if there are more exceptions 
during cleanup
+assertThatThrownBy(testHarness::finishProcessing)
+.satisfies(
+(ThrowingConsumer)
+throwable ->
+ExceptionUtils.assertThrowable(
+throwable,
+
FailingTwiceOperator.CloseException.class));

Review Comment:
   Yes, the line is also my original migration line.
   @1996fanrui 
   @reswqa described the reason why we still need the  
`ExceptionUtils.assertThrowable` at  
https://github.com/apache/flink/pull/21999#discussion_r1116493534 due to the 
depth of the exception. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on pull request #22017: [BP-1.17][FLINK-31142][Table SQL/Client] Catch TokenMgrError in SqlCommandParserImpl#scan

2023-02-24 Thread via GitHub


snuyanzin commented on PR #22017:
URL: https://github.com/apache/flink/pull/22017#issuecomment-1444782448

   @reswqa this is just a backport to 1.17. Since you reviewed it at 
https://github.com/apache/flink/pull/22007, could you please have a look here? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #21993: [FLINK-31166][table] Fix array_contains does not support null argumen…

2023-02-24 Thread via GitHub


snuyanzin commented on code in PR #21993:
URL: https://github.com/apache/flink/pull/21993#discussion_r1117821456


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayElementArgumentTypeStrategy.java:
##
@@ -31,24 +31,28 @@
 
 import java.util.Optional;
 
-import static 
org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsImplicitCast;
-
 /** Specific {@link ArgumentTypeStrategy} for {@link 
BuiltInFunctionDefinitions#ARRAY_CONTAINS}. */
 @Internal
 class ArrayElementArgumentTypeStrategy implements ArgumentTypeStrategy {
 
+private final boolean preserveNullability;
+
+public ArrayElementArgumentTypeStrategy(boolean preserveNullability) {
+this.preserveNullability = preserveNullability;
+}
+
 @Override
 public Optional inferArgumentType(
 CallContext callContext, int argumentPos, boolean throwOnFailure) {
 final ArrayType haystackType =
 (ArrayType) 
callContext.getArgumentDataTypes().get(0).getLogicalType();
 final LogicalType haystackElementType = haystackType.getElementType();
-final LogicalType needleType =
-
callContext.getArgumentDataTypes().get(argumentPos).getLogicalType();
-if (supportsImplicitCast(needleType, haystackElementType)) {

Review Comment:
   Based on the Contract for 
`org.apache.flink.table.types.inference.ArgumentTypeStrategy#inferArgumentType` 
it should return an empty value in case input type could not be inferred. 
   Right not this change violates this contract
   
https://github.com/apache/flink/blob/464ded1c2a0497255b70f711167c3b7ae52ea0f7/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/ArgumentTypeStrategy.java#L33-L48



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31142) Some queries lead to abrupt sql client close

2023-02-24 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693366#comment-17693366
 ] 

Sergey Nuyanzin commented on FLINK-31142:
-

Merged to master: 464ded1c2a0497255b70f711167c3b7ae52ea0f7

> Some queries lead to abrupt sql client close
> 
>
> Key: FLINK-31142
> URL: https://issues.apache.org/jira/browse/FLINK-31142
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.17.0
>Reporter: Sergey Nuyanzin
>Priority: Blocker
>  Labels: pull-request-available
>
> Although the behavior has been changed in 1.17.0, I'm not sure whether it is 
> a blocker or not, since in both cases it is invalid query.
> I put it to blocker just because of regression.
> The difference in the behavior is that before 1.17.0
> a query like 
> {code:sql}
> select /* multiline comment;
> {code}
> fails to execute and sql client prompts to submit another query.
> In 1.17.0 it  shuts down the session failing with 
> {noformat}
> Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
> Could not read from command line.
>   at 
> org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:205)
>   at 
> org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:168)
>   at 
> org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:113)
>   at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:169)
>   at org.apache.flink.table.client.SqlClient.start(SqlClient.java:118)
>   at 
> org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:228)
>   at org.apache.flink.table.client.SqlClient.main(SqlClient.java:179)
> Caused by: org.apache.flink.sql.parser.impl.TokenMgrError: Lexical error at 
> line 1, column 29.  Encountered:  after : ""
>   at 
> org.apache.flink.sql.parser.impl.FlinkSqlParserImplTokenManager.getNextToken(FlinkSqlParserImplTokenManager.java:26752)
>   at 
> org.apache.flink.table.client.cli.parser.SqlCommandParserImpl$TokenIterator.scan(SqlCommandParserImpl.java:89)
>   at 
> org.apache.flink.table.client.cli.parser.SqlCommandParserImpl$TokenIterator.next(SqlCommandParserImpl.java:81)
>   at 
> org.apache.flink.table.client.cli.parser.SqlCommandParserImpl.checkIncompleteStatement(SqlCommandParserImpl.java:141)
>   at 
> org.apache.flink.table.client.cli.parser.SqlCommandParserImpl.getCommand(SqlCommandParserImpl.java:111)
>   at 
> org.apache.flink.table.client.cli.parser.SqlCommandParserImpl.parseStatement(SqlCommandParserImpl.java:52)
>   at 
> org.apache.flink.table.client.cli.parser.SqlMultiLineParser.parse(SqlMultiLineParser.java:82)
>   at 
> org.jline.reader.impl.LineReaderImpl.acceptLine(LineReaderImpl.java:2964)
>   at 
> org.jline.reader.impl.LineReaderImpl$1.apply(LineReaderImpl.java:3778)
>   at 
> org.jline.reader.impl.LineReaderImpl.readLine(LineReaderImpl.java:679)
>   at 
> org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:183)
>   ... 6 more
> Shutting down the session...
> done.
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #22019: [FLINK-31221] Fix Typo in YarnConfigOptions.

2023-02-24 Thread via GitHub


flinkbot commented on PR #22019:
URL: https://github.com/apache/flink/pull/22019#issuecomment-1444576741

   
   ## CI report:
   
   * 830cda0aa78f44f17c0e2e05e3f91657bb9863e7 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] slfan1989 commented on pull request #22019: [FLINK-31221] Fix Typo in YarnConfigOptions.

2023-02-24 Thread via GitHub


slfan1989 commented on PR #22019:
URL: https://github.com/apache/flink/pull/22019#issuecomment-1444573260

   After I found this typo problem, I checked all the codes under 
moudle(flink-yarn), only this typo.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31221) Fix Typo in YarnConfigOptions

2023-02-24 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31221:
---
Labels: pull-request-available  (was: )

> Fix Typo in YarnConfigOptions
> -
>
> Key: FLINK-31221
> URL: https://issues.apache.org/jira/browse/FLINK-31221
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.17.1
>Reporter: Shilun Fan
>Priority: Major
>  Labels: pull-request-available
>
> I found a typo in YarnConfigOptions, I will fix it.
> willl -> will
> {code:java}
> public static final ConfigOption LOCALIZED_KEYTAB_PATH =
>  .
> "Local (on NodeManager) path where kerberos keytab 
> file will be"
> + " localized to. If "
> + SHIP_LOCAL_KEYTAB.key()
> + " set to "
> + "true, Flink willl ship the keytab file as 
> a YARN local "
> + "resource. In this case, the path is 
> relative to the local "
> + "resource directory. If set to false, Flink"
> + " will try to directly locate the keytab 
> from the path itself."); 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] slfan1989 opened a new pull request, #22019: [FLINK-31221] Fix Typo in YarnConfigOptions.

2023-02-24 Thread via GitHub


slfan1989 opened a new pull request, #22019:
URL: https://github.com/apache/flink/pull/22019

   
   
   ## What is the purpose of the change
   JIRA: FLINK-31221. Fix Typo in YarnConfigOptions.
   I found a typo in YarnConfigOptions, I will fix it
   
   ## Brief change log
   
   Fix Typo Issue.
   
   ## Verifying this change
   
   Manually verified.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-31221) Fix Typo in YarnConfigOptions

2023-02-24 Thread Shilun Fan (Jira)
Shilun Fan created FLINK-31221:
--

 Summary: Fix Typo in YarnConfigOptions
 Key: FLINK-31221
 URL: https://issues.apache.org/jira/browse/FLINK-31221
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / YARN
Affects Versions: 1.17.1
Reporter: Shilun Fan


I found a typo in YarnConfigOptions, I will fix it.

willl -> will
{code:java}
public static final ConfigOption LOCALIZED_KEYTAB_PATH =
 .
"Local (on NodeManager) path where kerberos keytab file 
will be"
+ " localized to. If "
+ SHIP_LOCAL_KEYTAB.key()
+ " set to "
+ "true, Flink willl ship the keytab file as a 
YARN local "
+ "resource. In this case, the path is relative 
to the local "
+ "resource directory. If set to false, Flink"
+ " will try to directly locate the keytab from 
the path itself."); 


{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31216) Update kryo to current

2023-02-24 Thread Morey Straus (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Morey Straus updated FLINK-31216:
-
Description: kryo 2.24 is several years out of date and has a 
[deserialization 
vulnerability|https://github.com/EsotericSoftware/kryo/issues/942] associated 
with it.  Please update to current.  (was: kryo 2.24 is several years out of 
date and has a deserialization vulnerability associated with it.  Please update 
to current.)

> Update kryo to current
> --
>
> Key: FLINK-31216
> URL: https://issues.apache.org/jira/browse/FLINK-31216
> Project: Flink
>  Issue Type: Technical Debt
>Reporter: Morey Straus
>Priority: Major
>  Labels: security
>
> kryo 2.24 is several years out of date and has a [deserialization 
> vulnerability|https://github.com/EsotericSoftware/kryo/issues/942] associated 
> with it.  Please update to current.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-22793) HybridSource Table Implementation

2023-02-24 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693339#comment-17693339
 ] 

Thomas Weise commented on FLINK-22793:
--

[~nicholasjiang] thanks for the confirmation!

> HybridSource Table Implementation
> -
>
> Key: FLINK-22793
> URL: https://issues.apache.org/jira/browse/FLINK-22793
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common
>Reporter: Nicholas Jiang
>Assignee: Ran Tao
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-22793) HybridSource Table Implementation

2023-02-24 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise reassigned FLINK-22793:


Assignee: Ran Tao  (was: Nicholas Jiang)

> HybridSource Table Implementation
> -
>
> Key: FLINK-22793
> URL: https://issues.apache.org/jira/browse/FLINK-22793
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common
>Reporter: Nicholas Jiang
>Assignee: Ran Tao
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-22793) HybridSource Table Implementation

2023-02-24 Thread Nicholas Jiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693322#comment-17693322
 ] 

Nicholas Jiang commented on FLINK-22793:


[~thw] , of course could assign to [~lemonjing].

> HybridSource Table Implementation
> -
>
> Key: FLINK-22793
> URL: https://issues.apache.org/jira/browse/FLINK-22793
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common
>Reporter: Nicholas Jiang
>Assignee: Nicholas Jiang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31040) Looping pattern notFollowedBy at end missing an element

2023-02-24 Thread Nicholas Jiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693321#comment-17693321
 ] 

Nicholas Jiang commented on FLINK-31040:


[~Juntao Hu], [~martijnvisser], thanks for the reporter. Looping pattern 
notFollowedBy at end indeed misses an element. Could you like to fix the 
missing?

> Looping pattern notFollowedBy at end missing an element
> ---
>
> Key: FLINK-31040
> URL: https://issues.apache.org/jira/browse/FLINK-31040
> Project: Flink
>  Issue Type: Bug
>  Components: Library / CEP
>Affects Versions: 1.17.0, 1.15.3, 1.16.1
>Reporter: Juntao Hu
>Priority: Major
>
> Pattern: begin("A", 
> SKIP_TO_NEXT).oneOrMore().consecutive().notFollowedBy("B").within(Time.milliseconds(3))
> Sequence:  will produce results 
> [a1, a2], [a2, a3], [a3], which obviously should be [a1, a2, a3], [a2, a3, 
> a4], [a3, a4], [a4].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #22018: [docs][hotfix] correct label in config configuration

2023-02-24 Thread via GitHub


flinkbot commented on PR #22018:
URL: https://github.com/apache/flink/pull/22018#issuecomment-1444277820

   
   ## CI report:
   
   * 3e076d84d04b65e4a7fb6bfffe60b70643951629 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Samrat002 opened a new pull request, #22018: [docs][hotfix] correct label in config configuration

2023-02-24 Thread via GitHub


Samrat002 opened a new pull request, #22018:
URL: https://github.com/apache/flink/pull/22018

   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no) no 
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no) no
 - The serializers: (yes / no / don't know) no
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know) no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) 
no
 - The S3 file system connector: (yes / no / don't know) no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no) no
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented) no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-31220) Replace Pod with PodTemplateSpec for the pod template properties

2023-02-24 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-31220:
--

 Summary: Replace Pod with PodTemplateSpec for the pod template 
properties
 Key: FLINK-31220
 URL: https://issues.apache.org/jira/browse/FLINK-31220
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Gyula Fora


The current podtemplate fields in the CR use the Pod object for schema. This 
doesn't make sense as status and other fields should never be specified and 
they take no effect.

We should replace this with PodTemplateSpec and make sure that this is not a 
breaking change even if users incorrectly specified status before.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31066) Release Testing: Verify FLINK-29932 Upgrade Calcite to 1.29.0

2023-02-24 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693275#comment-17693275
 ] 

Matthias Pohl commented on FLINK-31066:
---

I should have remembered that the backwards compatibility between minor Flink 
version with changed execution plan is not given 
[[1]|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution]
 
[[2]|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/#table-api--sql].
 臘‍♂️ So, I guess, there's no point of checking this. [~Sergey Nuyanzin] do you 
consider it good enough to prove that the calcite update could be visualized 
through the Flink UI?

> Release Testing: Verify FLINK-29932 Upgrade Calcite to 1.29.0
> -
>
> Key: FLINK-31066
> URL: https://issues.apache.org/jira/browse/FLINK-31066
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Sergey Nuyanzin
>Assignee: Matthias Pohl
>Priority: Major
>  Labels: release-testing
> Attachments: SavepointReleaseTesting.java
>
>
> In fact this is a task to check all 3 Calcite upgrade related issues (1.27.0, 
> 1.28.0 and 1.29.0)
> Since there were added optimization for Sarg in Calcite 1.27.0 it would make 
> sense to check that different queries with Sarg operator are working ok.
> Also would make sense to check that SQL jobs with Sarg related queries could 
> be restored from previous Flink version.
> An example of SQL 
> {code:sql}
> SELECT a FROM MyTable WHERE a = 1 or a = 2 or a IS NOT NULL;{code}
> {code:sql}
> SELECT a FROM MyTable WHERE a = 1 or a = 2 or a IS NULL;
> {code}
> where MyTable is for instance
> {code:sql}
> CREATE TABLE MyTable (
>    a bigint,
>    b int not null,
>    c varchar,
>    d timestamp(3)
> ) with (...) 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-24 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1117292622


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateSizeEstimates.java:
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.OperatorIDPair;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.toMap;
+
+/** Managed Keyed State size estimates used to make scheduling decisions. */
+class StateSizeEstimates {
+private final Map averages;
+
+public StateSizeEstimates() {
+this(Collections.emptyMap());
+}
+
+public StateSizeEstimates(Map averages) {
+this.averages = averages;
+}
+
+public Optional estimate(JobVertexID jobVertexId) {
+return Optional.ofNullable(averages.get(jobVertexId));
+}
+
+static StateSizeEstimates empty() {
+return new StateSizeEstimates();
+}
+
+static StateSizeEstimates fromGraph(@Nullable ExecutionGraph 
executionGraph) {
+return Optional.ofNullable(executionGraph)
+.flatMap(graph -> 
Optional.ofNullable(graph.getCheckpointCoordinator()))
+.flatMap(coordinator -> 
Optional.ofNullable(coordinator.getCheckpointStore()))
+.flatMap(store -> 
Optional.ofNullable(store.getLatestCheckpoint()))
+.map(
+cp ->
+build(
+fromCompletedCheckpoint(cp),
+
mapVerticesToOperators(executionGraph)))
+.orElse(empty());
+}
+
+private static StateSizeEstimates build(
+Map sizePerOperator,
+Map> verticesToOperators) {
+Map verticesToSizes =
+verticesToOperators.entrySet().stream()
+.collect(
+toMap(Map.Entry::getKey, e -> 
size(e.getValue(), sizePerOperator)));
+return new StateSizeEstimates(verticesToSizes);
+}
+
+private static long size(Set ids, Map sizes) 
{
+return ids.stream()
+.mapToLong(key -> sizes.getOrDefault(key, 0L))
+.boxed()
+.reduce(Long::sum)
+.orElse(0L);
+}
+
+private static Map> mapVerticesToOperators(
+ExecutionGraph executionGraph) {
+return executionGraph.getAllVertices().entrySet().stream()
+.collect(toMap(Map.Entry::getKey, e -> 
getOperatorIDS(e.getValue(;
+}
+
+private static Set getOperatorIDS(ExecutionJobVertex v) {
+return v.getOperatorIDs().stream()
+.map(OperatorIDPair::getGeneratedOperatorID)
+.collect(Collectors.toSet());
+}
+
+private static Map 
fromCompletedCheckpoint(CompletedCheckpoint cp) {
+Stream> states =
+cp.getOperatorStates().entrySet().stream();
+Map estimates =
+states.collect(
+toMap(Map.Entry::getKey, e -> 
estimateKeyGroupStateSize(e.getValue(;
+return estimates;
+}
+
+private static long estimateKeyGroupStateSize(OperatorState state) {
+Stream handles =
+state.getSubtaskStates().values().stream()
+.flatMap(s -> s.getManagedKeyedState().stream());
+Stream> sizeAndCount =
+handles.map(
+  

[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-24 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1117292622


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateSizeEstimates.java:
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.OperatorIDPair;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.toMap;
+
+/** Managed Keyed State size estimates used to make scheduling decisions. */
+class StateSizeEstimates {
+private final Map averages;
+
+public StateSizeEstimates() {
+this(Collections.emptyMap());
+}
+
+public StateSizeEstimates(Map averages) {
+this.averages = averages;
+}
+
+public Optional estimate(JobVertexID jobVertexId) {
+return Optional.ofNullable(averages.get(jobVertexId));
+}
+
+static StateSizeEstimates empty() {
+return new StateSizeEstimates();
+}
+
+static StateSizeEstimates fromGraph(@Nullable ExecutionGraph 
executionGraph) {
+return Optional.ofNullable(executionGraph)
+.flatMap(graph -> 
Optional.ofNullable(graph.getCheckpointCoordinator()))
+.flatMap(coordinator -> 
Optional.ofNullable(coordinator.getCheckpointStore()))
+.flatMap(store -> 
Optional.ofNullable(store.getLatestCheckpoint()))
+.map(
+cp ->
+build(
+fromCompletedCheckpoint(cp),
+
mapVerticesToOperators(executionGraph)))
+.orElse(empty());
+}
+
+private static StateSizeEstimates build(
+Map sizePerOperator,
+Map> verticesToOperators) {
+Map verticesToSizes =
+verticesToOperators.entrySet().stream()
+.collect(
+toMap(Map.Entry::getKey, e -> 
size(e.getValue(), sizePerOperator)));
+return new StateSizeEstimates(verticesToSizes);
+}
+
+private static long size(Set ids, Map sizes) 
{
+return ids.stream()
+.mapToLong(key -> sizes.getOrDefault(key, 0L))
+.boxed()
+.reduce(Long::sum)
+.orElse(0L);
+}
+
+private static Map> mapVerticesToOperators(
+ExecutionGraph executionGraph) {
+return executionGraph.getAllVertices().entrySet().stream()
+.collect(toMap(Map.Entry::getKey, e -> 
getOperatorIDS(e.getValue(;
+}
+
+private static Set getOperatorIDS(ExecutionJobVertex v) {
+return v.getOperatorIDs().stream()
+.map(OperatorIDPair::getGeneratedOperatorID)
+.collect(Collectors.toSet());
+}
+
+private static Map 
fromCompletedCheckpoint(CompletedCheckpoint cp) {
+Stream> states =
+cp.getOperatorStates().entrySet().stream();
+Map estimates =
+states.collect(
+toMap(Map.Entry::getKey, e -> 
estimateKeyGroupStateSize(e.getValue(;
+return estimates;
+}
+
+private static long estimateKeyGroupStateSize(OperatorState state) {
+Stream handles =
+state.getSubtaskStates().values().stream()
+.flatMap(s -> s.getManagedKeyedState().stream());
+Stream> sizeAndCount =
+handles.map(
+  

[GitHub] [flink] twalthr commented on a diff in pull request #19623: [FLINK-27471][table] Add ARRAY_DISTINCT function

2023-02-24 Thread via GitHub


twalthr commented on code in PR #19623:
URL: https://github.com/apache/flink/pull/19623#discussion_r1117290234


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayDistinctFunction.java:
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_DISTINCT}. */
+@Internal
+public class ArrayDistinctFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter elementGetter;
+
+public ArrayDistinctFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.ARRAY_DISTINCT, context);
+final DataType dataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType();
+elementGetter = 
ArrayData.createElementGetter(dataType.getLogicalType());
+}
+
+public @Nullable ArrayData eval(ArrayData haystack) {
+try {
+if (haystack == null) {
+return null;
+}
+Set set = new LinkedHashSet<>();
+final int size = haystack.size();
+for (int pos = 0; pos < size; pos++) {
+final Object element = 
elementGetter.getElementOrNull(haystack, pos);
+set.add(element);

Review Comment:
   I don’t have the time to find a bug, but I just wanted to point out the 
existence of the expression evaluator to get access to primitive functions such 
as CAST or EQUALS. CC @liuyongvs 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] twalthr commented on a diff in pull request #19623: [FLINK-27471][table] Add ARRAY_DISTINCT function

2023-02-24 Thread via GitHub


twalthr commented on code in PR #19623:
URL: https://github.com/apache/flink/pull/19623#discussion_r1117290234


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayDistinctFunction.java:
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_DISTINCT}. */
+@Internal
+public class ArrayDistinctFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter elementGetter;
+
+public ArrayDistinctFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.ARRAY_DISTINCT, context);
+final DataType dataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType();
+elementGetter = 
ArrayData.createElementGetter(dataType.getLogicalType());
+}
+
+public @Nullable ArrayData eval(ArrayData haystack) {
+try {
+if (haystack == null) {
+return null;
+}
+Set set = new LinkedHashSet<>();
+final int size = haystack.size();
+for (int pos = 0; pos < size; pos++) {
+final Object element = 
elementGetter.getElementOrNull(haystack, pos);
+set.add(element);

Review Comment:
   I don’t have the time to find a bug, but I just wanted to point out the 
existence of the expression evaluator to get access to primitive functions such 
as CAST or EQUALS. CC @luoyuxia 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] twalthr commented on a diff in pull request #19623: [FLINK-27471][table] Add ARRAY_DISTINCT function

2023-02-24 Thread via GitHub


twalthr commented on code in PR #19623:
URL: https://github.com/apache/flink/pull/19623#discussion_r1117286705


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayDistinctFunction.java:
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_DISTINCT}. */
+@Internal
+public class ArrayDistinctFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter elementGetter;
+
+public ArrayDistinctFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.ARRAY_DISTINCT, context);
+final DataType dataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType();
+elementGetter = 
ArrayData.createElementGetter(dataType.getLogicalType());
+}
+
+public @Nullable ArrayData eval(ArrayData haystack) {
+try {
+if (haystack == null) {
+return null;
+}
+Set set = new LinkedHashSet<>();
+final int size = haystack.size();
+for (int pos = 0; pos < size; pos++) {
+final Object element = 
elementGetter.getElementOrNull(haystack, pos);
+set.add(element);

Review Comment:
   @snuyanzin We need to be careful with equality of `RowData`, `MapData` and 
others. I'm not saying that this implementation is wrong, maybe it works for 
most cases. But I'm also not sure whether there is a hidden bug.
   
   If you look at `BinaryRowData`, it compares binary sections. Which means 
that equality is not based on SQL semantics. This is actually why I added 
`SpecializedFunction` with a `ExpressionEvaluatorFactory`. Only with this piece 
it is now possible to call the SQL compliant equals function which can be used 
to assemble a proper distinct for all data types.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] venkata91 commented on a diff in pull request #22009: [FLINK-31109][yarn] Support Hadoop proxy user when delegation token f…

2023-02-24 Thread via GitHub


venkata91 commented on code in PR #22009:
URL: https://github.com/apache/flink/pull/22009#discussion_r1117272256


##
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProviderITCase.java:
##
@@ -57,7 +58,7 @@ public void isLoginPossibleMustReturnFalseByDefault() throws 
IOException {
 UserGroupInformation userGroupInformation = 
mock(UserGroupInformation.class);
 
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
 
-assertFalse(kerberosLoginProvider.isLoginPossible());
+assertFalse(kerberosLoginProvider.isLoginPossible(true));

Review Comment:
   Addressed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31207) Supports high order function like other engine

2023-02-24 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693263#comment-17693263
 ] 

Timo Walther commented on FLINK-31207:
--

[~jackylau] I would propose to make this a top-level issue. It goes beyond 
adding a built-in function but adds a new category of functions.

> Supports high order function like other engine
> --
>
> Key: FLINK-31207
> URL: https://issues.apache.org/jira/browse/FLINK-31207
> Project: Flink
>  Issue Type: Sub-task
>Reporter: jackylau
>Priority: Major
>
> spark [https://spark.apache.org/docs/latest/api/sql/index.html#transform]
> transform/transform_keys/transform_values 
> after calcite  https://issues.apache.org/jira/browse/CALCITE-3679s upports 
> high order functions, we should supports many high order funcsions like 
> spark/presto



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31219) Add support for canary resources

2023-02-24 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-31219:
--

 Summary: Add support for canary resources
 Key: FLINK-31219
 URL: https://issues.apache.org/jira/browse/FLINK-31219
 Project: Flink
  Issue Type: New Feature
  Components: Kubernetes Operator
Reporter: Gyula Fora
Assignee: Gyula Fora


While the current health probe mechanism is able to detect different types of 
errors like startup / informer issues it can be generally beneficial to allow a 
simply canary mechanism that can verify that the operator recieives updates and 
reconciles resources in a timely manner.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] venkata91 commented on a diff in pull request #22009: [FLINK-31109][yarn] Support Hadoop proxy user when delegation token f…

2023-02-24 Thread via GitHub


venkata91 commented on code in PR #22009:
URL: https://github.com/apache/flink/pull/22009#discussion_r1117217398


##
flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java:
##
@@ -70,10 +71,22 @@ public void install() throws SecurityInstallException {
 
 try {
 KerberosLoginProvider kerberosLoginProvider = new 
KerberosLoginProvider(securityConfig);
-if (kerberosLoginProvider.isLoginPossible()) {
-kerberosLoginProvider.doLogin();
+if (kerberosLoginProvider.isLoginPossible(true)) {
+kerberosLoginProvider.doLogin(true);
 loginUser = UserGroupInformation.getLoginUser();
 
+if (HadoopUserUtils.isProxyUser((loginUser))

Review Comment:
   Just wanted to confirm that we don't need any change right now here? Rather 
your comment is more for future, handling proxy user support w/ non-hadoop DT 
like S3. I thought about this case as well. Good point!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-31218) Improve health probe to recognize informers already failed at start

2023-02-24 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-31218:
--

 Summary: Improve health probe to recognize informers already 
failed at start
 Key: FLINK-31218
 URL: https://issues.apache.org/jira/browse/FLINK-31218
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Gyula Fora
Assignee: Gyula Fora


The current healthprobe will report unhealthy if any informers cannot start. 
This is in contradiction with the setting that can allow the operator to start 
while some informers are unhealthy (and keep trying to start them)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-24 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1117237527


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocationsInfo.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.util.Collections.emptyMap;
+
+class AllocationsInfo {

Review Comment:
   Yes, I agree.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring

2023-02-24 Thread via GitHub


1996fanrui commented on PR #21999:
URL: https://github.com/apache/flink/pull/21999#issuecomment-1443901443

   And please squash all review commits to the first commit after addressed all 
comments, thanks~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dmvk commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-24 Thread via GitHub


dmvk commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1117232460


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##
@@ -784,7 +788,7 @@ public ArchivedExecutionGraph getArchivedExecutionGraph(
 }
 
 @Override
-public void goToWaitingForResources() {
+public void goToWaitingForResources(ExecutionGraph executionGraph) {

Review Comment:
   ```suggestion
   public void goToWaitingForResources(@Nullable ExecutionGraph 
executionGraph) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dmvk commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-24 Thread via GitHub


dmvk commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1117232018


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java:
##
@@ -65,17 +69,18 @@ class WaitingForResources implements State, 
ResourceConsumer {
 desiredResources,
 initialResourceAllocationTimeout,
 resourceStabilizationTimeout,
-SystemClock.getInstance());
+SystemClock.getInstance(),
+null);
 }
 
-@VisibleForTesting
 WaitingForResources(
 Context context,
 Logger log,
 ResourceCounter desiredResources,
 Duration initialResourceAllocationTimeout,
 Duration resourceStabilizationTimeout,
-Clock clock) {
+Clock clock,
+ExecutionGraph previousExecutionGraph) {

Review Comment:
   ```suggestion
   @Nullable ExecutionGraph previousExecutionGraph) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] twalthr commented on a diff in pull request #21500: [FLINK-27995][table] Upgrade Janino version to 3.1.9

2023-02-24 Thread via GitHub


twalthr commented on code in PR #21500:
URL: https://github.com/apache/flink/pull/21500#discussion_r1117229017


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala:
##
@@ -1020,16 +1021,19 @@ object CodeGenUtils {
 }
 
 // convert internal format to target type
-val externalResultTerm = if (isInternalClass(targetDataType)) {
-  s"($targetTypeTerm) ${internalExpr.resultTerm}"
+val (externalResultTerm, resultType) = if 
(isInternalClass(targetDataType)) {

Review Comment:
   can we use more meaningful names here? `resultType` does not improve code 
readability. `externalResultTypeTerm` is the right name here.



##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala:
##
@@ -922,7 +922,8 @@ object CodeGenUtils {
 if (targetDataType.getConversionClass.isPrimitive) {
   externalResultTerm
 } else {
-  s"${internalExpr.nullTerm} ? null : ($externalResultTerm)"
+  // Cast of null is required because of janino issue 
https://github.com/janino-compiler/janino/issues/188
+  s"${internalExpr.nullTerm} ? 
(${typeTerm(targetDataType.getConversionClass)}) null : ($externalResultTerm)"

Review Comment:
   introduce a variable `externalResultTypeTerm = 
typeTerm(targetDataType.getConversionClass)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dmvk commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-24 Thread via GitHub


dmvk commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1117230371


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##
@@ -978,8 +983,8 @@ public CreatingExecutionGraph.AssignmentResult 
tryToAssignSlots(
 executionGraph.setInternalTaskFailuresListener(
 new UpdateSchedulerNgOnInternalFailuresListener(this));
 
-final VertexParallelism vertexParallelism =
-executionGraphWithVertexParallelism.getVertexParallelism();
+final JobSchedulingPlan vertexParallelism =

Review Comment:
   nit: rename varialbe



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring

2023-02-24 Thread via GitHub


1996fanrui commented on code in PR #21999:
URL: https://github.com/apache/flink/pull/21999#discussion_r1117225403


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java:
##
@@ -1103,35 +1143,41 @@ private void 
testFailToConfirmCheckpointMessage(Consumer> consu
  * finished.
  */
 @Test
-public void testCheckpointFailueOnClosedOperator() throws Throwable {
+void testCheckpointFailueOnClosedOperator() throws Exception {
 ClosingOperator operator = new ClosingOperator<>();
 StreamTaskMailboxTestHarnessBuilder builder =
 new StreamTaskMailboxTestHarnessBuilder<>(
 OneInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
 .addInput(BasicTypeInfo.INT_TYPE_INFO);
 try (StreamTaskMailboxTestHarness harness =
 
builder.setupOutputForSingletonOperatorChain(operator).build()) {
-// keeps the mailbox from suspending
-harness.setAutoProcess(false);
-harness.processElement(new StreamRecord<>(1));
-
-harness.streamTask.operatorChain.finishOperators(
-harness.streamTask.getActionExecutor(), StopMode.DRAIN);
-harness.streamTask.operatorChain.closeAllOperators();
-assertTrue(ClosingOperator.closed.get());
-
-harness.streamTask.triggerCheckpointOnBarrier(
-new CheckpointMetaData(1, 0),
-CheckpointOptions.forCheckpointWithDefaultLocation(),
-new CheckpointMetricsBuilder());
-} catch (Exception ex) {
-ExceptionUtils.assertThrowableWithMessage(
-ex, "OperatorChain and Task should never be closed at this 
point");
+assertThatThrownBy(
+() -> {
+// keeps the mailbox from suspending
+harness.setAutoProcess(false);
+harness.processElement(new StreamRecord<>(1));
+
+
harness.streamTask.operatorChain.finishOperators(
+
harness.streamTask.getActionExecutor(), StopMode.DRAIN);
+
harness.streamTask.operatorChain.closeAllOperators();
+
assertThat(ClosingOperator.closed.get()).isTrue();
+
+harness.streamTask.triggerCheckpointOnBarrier(
+new CheckpointMetaData(1, 0),
+
CheckpointOptions.forCheckpointWithDefaultLocation(),
+new CheckpointMetricsBuilder());
+})
+.satisfies(
+(Consumer)
+throwable ->
+
ExceptionUtils.assertThrowableWithMessage(
+throwable,
+"OperatorChain and Task 
should never be closed at this point"));

Review Comment:
   ```suggestion
   .hasRootCauseMessage("OperatorChain and Task should 
never be closed at this point");
   ```
   
   It works.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring

2023-02-24 Thread via GitHub


1996fanrui commented on code in PR #21999:
URL: https://github.com/apache/flink/pull/21999#discussion_r1117222444


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java:
##
@@ -362,30 +358,35 @@ private void testSyncSavepointWithEndInput(
 "savepointResult");
 harness.processAll();
 
-Assert.assertEquals(expectEndInput, 
TestBoundedOneInputStreamOperator.isInputEnded());
+
assertThat(TestBoundedOneInputStreamOperator.isInputEnded()).isEqualTo(expectEndInput);
 }
 
 @Test
-public void testCleanUpExceptionSuppressing() throws Exception {
+void testCleanUpExceptionSuppressing() throws Exception {
 try (StreamTaskMailboxTestHarness testHarness =
 new 
StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO)
 .addInput(STRING_TYPE_INFO)
 .setupOutputForSingletonOperatorChain(new 
FailingTwiceOperator())
 .build()) {
 
-try {
-testHarness.processElement(new StreamRecord<>("Doesn't 
matter", 0));
-throw new RuntimeException("Expected an exception but ran 
successfully");
-} catch (Exception ex) {
-ExceptionUtils.assertThrowable(ex, 
ExpectedTestException.class);
-}
+assertThatThrownBy(
+() ->
+testHarness.processElement(
+new StreamRecord<>("Doesn't 
matter", 0)))
+.satisfies(
+(Consumer)
+throwable ->
+ExceptionUtils.assertThrowable(
+throwable, 
ExpectedTestException.class));

Review Comment:
   ```suggestion
   .isInstanceOf(ExpectedTestException.class);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dmvk commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-24 Thread via GitHub


dmvk commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1117221334


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateSizeEstimates.java:
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.OperatorIDPair;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.toMap;
+
+/** Managed Keyed State size estimates used to make scheduling decisions. */
+class StateSizeEstimates {

Review Comment:
   sounds good



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dmvk commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-24 Thread via GitHub


dmvk commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1117220303


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Simple {@link SlotAssigner} that treats all slots and slot sharing groups 
equally. */
+public class DefaultSlotAssigner implements SlotAssigner {
+
+@Override
+public Collection assignSlots(
+JobInformation jobInformation,
+Collection freeSlots,
+VertexParallelism vertexParallelism,
+AllocationsInfo previousAllocations,
+StateSizeEstimates stateSizeEstimates) {
+List allGroups = new ArrayList<>();
+for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
+
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, 
slotSharingGroup));
+}
+
+Iterator iterator = freeSlots.iterator();
+Collection assignments = new ArrayList<>();
+for (ExecutionSlotSharingGroup group : allGroups) {
+assignments.add(new SlotAssignment(iterator.next(), group));

Review Comment:
   Sounds good, I'd also be fine with adding a simple precondition
   
   ```
   Preconditions.checkState(freeSlots.size() > allGroups.size(), "...");
   ```



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Simple {@link SlotAssigner} that treats all slots and slot sharing groups 
equally. */
+public class DefaultSlotAssigner implements SlotAssigner {
+
+@Override
+public Collection assignSlots(
+JobInformation jobInformation,
+Collection freeSlots,
+VertexParallelism vertexParallelism,
+AllocationsInfo previousAllocations,
+

[GitHub] [flink] 1996fanrui commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring

2023-02-24 Thread via GitHub


1996fanrui commented on code in PR #21999:
URL: https://github.com/apache/flink/pull/21999#discussion_r1117218805


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java:
##
@@ -362,30 +358,35 @@ private void testSyncSavepointWithEndInput(
 "savepointResult");
 harness.processAll();
 
-Assert.assertEquals(expectEndInput, 
TestBoundedOneInputStreamOperator.isInputEnded());
+
assertThat(TestBoundedOneInputStreamOperator.isInputEnded()).isEqualTo(expectEndInput);
 }
 
 @Test
-public void testCleanUpExceptionSuppressing() throws Exception {
+void testCleanUpExceptionSuppressing() throws Exception {
 try (StreamTaskMailboxTestHarness testHarness =
 new 
StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO)
 .addInput(STRING_TYPE_INFO)
 .setupOutputForSingletonOperatorChain(new 
FailingTwiceOperator())
 .build()) {
 
-try {
-testHarness.processElement(new StreamRecord<>("Doesn't 
matter", 0));
-throw new RuntimeException("Expected an exception but ran 
successfully");
-} catch (Exception ex) {
-ExceptionUtils.assertThrowable(ex, 
ExpectedTestException.class);
-}
+assertThatThrownBy(
+() ->
+testHarness.processElement(
+new StreamRecord<>("Doesn't 
matter", 0)))
+.satisfies(
+(Consumer)
+throwable ->
+ExceptionUtils.assertThrowable(
+throwable, 
ExpectedTestException.class));
 
-try {
-testHarness.finishProcessing();
-} catch (Exception ex) {
-// todo: checking for suppression if there are more exceptions 
during cleanup
-ExceptionUtils.assertThrowable(ex, 
FailingTwiceOperator.CloseException.class);
-}
+// todo: checking for suppression if there are more exceptions 
during cleanup
+assertThatThrownBy(testHarness::finishProcessing)
+.satisfies(
+(ThrowingConsumer)
+throwable ->
+ExceptionUtils.assertThrowable(
+throwable,
+
FailingTwiceOperator.CloseException.class));

Review Comment:
   ```suggestion
   .isInstanceOf(FailingTwiceOperator.CloseException.class);
   ```
   
   I have tried it, it works.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] venkata91 commented on a diff in pull request #22009: [FLINK-31109][yarn] Support Hadoop proxy user when delegation token f…

2023-02-24 Thread via GitHub


venkata91 commented on code in PR #22009:
URL: https://github.com/apache/flink/pull/22009#discussion_r1117217398


##
flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java:
##
@@ -70,10 +71,22 @@ public void install() throws SecurityInstallException {
 
 try {
 KerberosLoginProvider kerberosLoginProvider = new 
KerberosLoginProvider(securityConfig);
-if (kerberosLoginProvider.isLoginPossible()) {
-kerberosLoginProvider.doLogin();
+if (kerberosLoginProvider.isLoginPossible(true)) {
+kerberosLoginProvider.doLogin(true);
 loginUser = UserGroupInformation.getLoginUser();
 
+if (HadoopUserUtils.isProxyUser((loginUser))

Review Comment:
   Just wanted to confirm that we don't need any change right now here? Rather 
you comment is more for future, handling proxy user support w/ non-hadoop DT 
like S3. I thought about this case as well. Good point!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] venkata91 commented on a diff in pull request #22009: [FLINK-31109][yarn] Support Hadoop proxy user when delegation token f…

2023-02-24 Thread via GitHub


venkata91 commented on code in PR #22009:
URL: https://github.com/apache/flink/pull/22009#discussion_r1117217398


##
flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java:
##
@@ -70,10 +71,22 @@ public void install() throws SecurityInstallException {
 
 try {
 KerberosLoginProvider kerberosLoginProvider = new 
KerberosLoginProvider(securityConfig);
-if (kerberosLoginProvider.isLoginPossible()) {
-kerberosLoginProvider.doLogin();
+if (kerberosLoginProvider.isLoginPossible(true)) {
+kerberosLoginProvider.doLogin(true);
 loginUser = UserGroupInformation.getLoginUser();
 
+if (HadoopUserUtils.isProxyUser((loginUser))

Review Comment:
   Just wanted to confirm that we no need any change right now? Rather you 
comment is more for future, handling proxy user support w/ non-hadoop DT like 
S3. I thought about this case as well. Good point!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dmvk commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-24 Thread via GitHub


dmvk commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1117216690


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocationsInfo.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.util.Collections.emptyMap;
+
+class AllocationsInfo {

Review Comment:
   nit: it would be great to make the naming consistent with `JobInformation`; 
e.g. calling the outer class `JobAllocationInformation` and the inner one 
`VertexAllocationInformation`. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dmvk commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-24 Thread via GitHub


dmvk commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1117209507


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.PriorityQueue;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.DefaultSlotAssigner.createExecutionSlotSharingGroups;
+
+/** A {@link SlotAssigner} that assigns slots based on the number of local key 
groups. */
+@Internal
+public class StateLocalitySlotAssigner implements SlotAssigner {
+
+private static class AllocationScore implements 
Comparable {
+
+private final String group;
+private final AllocationID allocationId;
+
+public AllocationScore(String group, AllocationID allocationId, long 
score) {
+this.group = group;
+this.allocationId = allocationId;
+this.score = score;
+}
+
+private final long score;
+
+public String getGroup() {
+return group;
+}
+
+public AllocationID getAllocationId() {
+return allocationId;
+}
+
+public long getScore() {
+return score;
+}
+
+@Override
+public int compareTo(StateLocalitySlotAssigner.AllocationScore other) {
+int result = Long.compare(score, other.score);
+if (result != 0) {
+return result;
+}
+result = other.allocationId.compareTo(allocationId);
+if (result != 0) {
+return result;
+}
+return other.group.compareTo(group);
+}
+}
+
+@Override
+public Collection assignSlots(
+JobInformation jobInformation,
+Collection freeSlots,
+VertexParallelism vertexParallelism,
+AllocationsInfo previousAllocations,
+StateSizeEstimates stateSizeEstimates) {
+final List allGroups = new ArrayList<>();
+for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
+
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, 
slotSharingGroup));
+}
+final Map parallelism = 
getParallelism(allGroups);
+
+// PQ orders the pairs (allocationID, groupID) by score, decreasing
+// the score is computed as the potential amount of state that would 
reside locally
+final PriorityQueue scores =
+new PriorityQueue<>(Comparator.reverseOrder());
+for (ExecutionSlotSharingGroup group : allGroups) {
+calculateScore(
+group,
+parallelism,
+jobInformation,
+previousAllocations,
+stateSizeEstimates)
+.entrySet().stream()
+.map(e -> new AllocationScore(group.getId(), e.getKey(), 
e.getValue()))
+.forEach(scores::add);
+}
+
+

[GitHub] [flink] dmvk commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-24 Thread via GitHub


dmvk commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1117208355


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateSizeEstimates.java:
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.OperatorIDPair;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.toMap;
+
+/** Managed Keyed State size estimates used to make scheduling decisions. */
+class StateSizeEstimates {
+private final Map averages;
+
+public StateSizeEstimates() {
+this(Collections.emptyMap());
+}
+
+public StateSizeEstimates(Map averages) {
+this.averages = averages;
+}
+
+public Optional estimate(JobVertexID jobVertexId) {
+return Optional.ofNullable(averages.get(jobVertexId));
+}
+
+static StateSizeEstimates empty() {
+return new StateSizeEstimates();
+}
+
+static StateSizeEstimates fromGraph(@Nullable ExecutionGraph 
executionGraph) {
+return Optional.ofNullable(executionGraph)
+.flatMap(graph -> 
Optional.ofNullable(graph.getCheckpointCoordinator()))
+.flatMap(coordinator -> 
Optional.ofNullable(coordinator.getCheckpointStore()))
+.flatMap(store -> 
Optional.ofNullable(store.getLatestCheckpoint()))
+.map(
+cp ->
+build(
+fromCompletedCheckpoint(cp),
+
mapVerticesToOperators(executionGraph)))
+.orElse(empty());
+}
+
+private static StateSizeEstimates build(
+Map sizePerOperator,
+Map> verticesToOperators) {
+Map verticesToSizes =
+verticesToOperators.entrySet().stream()
+.collect(
+toMap(Map.Entry::getKey, e -> 
size(e.getValue(), sizePerOperator)));
+return new StateSizeEstimates(verticesToSizes);
+}
+
+private static long size(Set ids, Map sizes) 
{
+return ids.stream()
+.mapToLong(key -> sizes.getOrDefault(key, 0L))
+.boxed()
+.reduce(Long::sum)
+.orElse(0L);
+}
+
+private static Map> mapVerticesToOperators(
+ExecutionGraph executionGraph) {
+return executionGraph.getAllVertices().entrySet().stream()
+.collect(toMap(Map.Entry::getKey, e -> 
getOperatorIDS(e.getValue(;
+}
+
+private static Set getOperatorIDS(ExecutionJobVertex v) {
+return v.getOperatorIDs().stream()
+.map(OperatorIDPair::getGeneratedOperatorID)
+.collect(Collectors.toSet());
+}
+
+private static Map 
fromCompletedCheckpoint(CompletedCheckpoint cp) {
+Stream> states =
+cp.getOperatorStates().entrySet().stream();
+Map estimates =
+states.collect(
+toMap(Map.Entry::getKey, e -> 
estimateKeyGroupStateSize(e.getValue(;
+return estimates;
+}
+
+private static long estimateKeyGroupStateSize(OperatorState state) {
+Stream handles =
+state.getSubtaskStates().values().stream()
+.flatMap(s -> s.getManagedKeyedState().stream());
+Stream> sizeAndCount =
+handles.map(
+h ->

[GitHub] [flink] dmvk commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-24 Thread via GitHub


dmvk commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1117202797


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateSizeEstimates.java:
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.OperatorIDPair;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.toMap;
+
+/** Managed Keyed State size estimates used to make scheduling decisions. */
+class StateSizeEstimates {
+private final Map averages;
+
+public StateSizeEstimates() {
+this(Collections.emptyMap());
+}
+
+public StateSizeEstimates(Map averages) {
+this.averages = averages;
+}
+
+public Optional estimate(JobVertexID jobVertexId) {
+return Optional.ofNullable(averages.get(jobVertexId));
+}
+
+static StateSizeEstimates empty() {
+return new StateSizeEstimates();
+}
+
+static StateSizeEstimates fromGraph(@Nullable ExecutionGraph 
executionGraph) {
+return Optional.ofNullable(executionGraph)
+.flatMap(graph -> 
Optional.ofNullable(graph.getCheckpointCoordinator()))
+.flatMap(coordinator -> 
Optional.ofNullable(coordinator.getCheckpointStore()))
+.flatMap(store -> 
Optional.ofNullable(store.getLatestCheckpoint()))
+.map(
+cp ->
+build(
+fromCompletedCheckpoint(cp),
+
mapVerticesToOperators(executionGraph)))
+.orElse(empty());
+}
+
+private static StateSizeEstimates build(
+Map sizePerOperator,
+Map> verticesToOperators) {
+Map verticesToSizes =
+verticesToOperators.entrySet().stream()
+.collect(
+toMap(Map.Entry::getKey, e -> 
size(e.getValue(), sizePerOperator)));
+return new StateSizeEstimates(verticesToSizes);
+}
+
+private static long size(Set ids, Map sizes) 
{
+return ids.stream()
+.mapToLong(key -> sizes.getOrDefault(key, 0L))
+.boxed()
+.reduce(Long::sum)
+.orElse(0L);
+}
+
+private static Map> mapVerticesToOperators(
+ExecutionGraph executionGraph) {
+return executionGraph.getAllVertices().entrySet().stream()
+.collect(toMap(Map.Entry::getKey, e -> 
getOperatorIDS(e.getValue(;
+}
+
+private static Set getOperatorIDS(ExecutionJobVertex v) {
+return v.getOperatorIDs().stream()
+.map(OperatorIDPair::getGeneratedOperatorID)
+.collect(Collectors.toSet());
+}
+
+private static Map 
fromCompletedCheckpoint(CompletedCheckpoint cp) {
+Stream> states =
+cp.getOperatorStates().entrySet().stream();
+Map estimates =
+states.collect(
+toMap(Map.Entry::getKey, e -> 
estimateKeyGroupStateSize(e.getValue(;
+return estimates;
+}
+
+private static long estimateKeyGroupStateSize(OperatorState state) {
+Stream handles =
+state.getSubtaskStates().values().stream()
+.flatMap(s -> s.getManagedKeyedState().stream());
+Stream> sizeAndCount =
+handles.map(
+h ->

[GitHub] [flink] dmvk commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-24 Thread via GitHub


dmvk commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1117189073


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.PriorityQueue;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.DefaultSlotAssigner.createExecutionSlotSharingGroups;
+
+/** A {@link SlotAssigner} that assigns slots based on the number of local key 
groups. */
+@Internal
+public class StateLocalitySlotAssigner implements SlotAssigner {
+
+private static class AllocationScore implements 
Comparable {
+
+private final String group;
+private final AllocationID allocationId;
+
+public AllocationScore(String group, AllocationID allocationId, long 
score) {
+this.group = group;
+this.allocationId = allocationId;
+this.score = score;
+}
+
+private final long score;
+
+public String getGroup() {
+return group;
+}
+
+public AllocationID getAllocationId() {
+return allocationId;
+}
+
+public long getScore() {
+return score;
+}
+
+@Override
+public int compareTo(StateLocalitySlotAssigner.AllocationScore other) {
+int result = Long.compare(score, other.score);
+if (result != 0) {
+return result;
+}
+result = other.allocationId.compareTo(allocationId);
+if (result != 0) {
+return result;
+}
+return other.group.compareTo(group);
+}
+}
+
+@Override
+public Collection assignSlots(
+JobInformation jobInformation,
+Collection freeSlots,
+VertexParallelism vertexParallelism,
+AllocationsInfo previousAllocations,
+StateSizeEstimates stateSizeEstimates) {
+final List allGroups = new ArrayList<>();
+for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
+
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, 
slotSharingGroup));
+}
+final Map parallelism = 
getParallelism(allGroups);
+
+// PQ orders the pairs (allocationID, groupID) by score, decreasing
+// the score is computed as the potential amount of state that would 
reside locally
+final PriorityQueue scores =
+new PriorityQueue<>(Comparator.reverseOrder());
+for (ExecutionSlotSharingGroup group : allGroups) {
+calculateScore(
+group,
+parallelism,
+jobInformation,
+previousAllocations,
+stateSizeEstimates)
+.entrySet().stream()
+.map(e -> new AllocationScore(group.getId(), e.getKey(), 
e.getValue()))
+.forEach(scores::add);
+}
+
+

[GitHub] [flink] dmvk commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-24 Thread via GitHub


dmvk commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1117187236


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocationsInfo.JobVertexAllocationInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.stream.Collectors;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.DefaultSlotAssigner.createExecutionSlotSharingGroups;
+
+/** A {@link SlotAssigner} that assigns slots based on the number of local key 
groups. */
+@Internal
+public class StateLocalitySlotAssigner implements SlotAssigner {
+
+private static class AllocationScore implements 
Comparable {
+
+private final String group;

Review Comment:
   ```suggestion
   private final String groupId;
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-web] mxm commented on a diff in pull request #613: Kubernetes Operator 1.4.0 blogpost

2023-02-24 Thread via GitHub


mxm commented on code in PR #613:
URL: https://github.com/apache/flink-web/pull/613#discussion_r1117121658


##
docs/content/posts/2023-02-27-release-kubernetes-operator-1.4.0.md:
##
@@ -0,0 +1,77 @@
+---
+title:  "Apache Flink Kubernetes Operator 1.4.0 Release Announcement"
+date: "2023-02-27T08:00:00.000Z"
+authors:
+- gyfora:
+  name: "Gyula Fora"
+  twitter: "GyulaFora"
+- mxm:
+  name: "Max Michels"
+  twitter: "stadtlegende"
+aliases:
+- /news/2023/02/27/release-kubernetes-operator-1.4.0.html
+---
+
+We are proud to announce the latest stable release of the operator. In 
addition to the expected stability improvements and fixes, the 1.4.0 release 
introduces the first version of the long-awaited autoscaler module.
+
+## Flink Streaming Job Autoscaler
+
+A highly requested feature for Flink applications is the ability to scale the 
pipeline based on incoming data load and other performance metrics. While Flink 
has already provided some of the required building blocks, this feature has not 
yet been realised in the open source ecosystem.

Review Comment:
   ```suggestion
   A highly requested feature for Flink applications is the ability to scale 
the pipeline based on incoming data load and the utilization of the dataflow. 
While Flink has already provided some of the required building blocks, this 
feature has not yet been realized in the open source ecosystem.
   ```



##
docs/content/posts/2023-02-27-release-kubernetes-operator-1.4.0.md:
##
@@ -0,0 +1,77 @@
+---
+title:  "Apache Flink Kubernetes Operator 1.4.0 Release Announcement"
+date: "2023-02-27T08:00:00.000Z"
+authors:
+- gyfora:
+  name: "Gyula Fora"
+  twitter: "GyulaFora"
+- mxm:
+  name: "Max Michels"

Review Comment:
   NIT 
   ```suggestion
 name: "Maximilian Michels"
   ```



##
docs/content/posts/2023-02-27-release-kubernetes-operator-1.4.0.md:
##
@@ -0,0 +1,77 @@
+---
+title:  "Apache Flink Kubernetes Operator 1.4.0 Release Announcement"
+date: "2023-02-27T08:00:00.000Z"
+authors:
+- gyfora:
+  name: "Gyula Fora"
+  twitter: "GyulaFora"
+- mxm:
+  name: "Max Michels"
+  twitter: "stadtlegende"
+aliases:
+- /news/2023/02/27/release-kubernetes-operator-1.4.0.html
+---
+
+We are proud to announce the latest stable release of the operator. In 
addition to the expected stability improvements and fixes, the 1.4.0 release 
introduces the first version of the long-awaited autoscaler module.
+
+## Flink Streaming Job Autoscaler
+
+A highly requested feature for Flink applications is the ability to scale the 
pipeline based on incoming data load and other performance metrics. While Flink 
has already provided some of the required building blocks, this feature has not 
yet been realised in the open source ecosystem.
+
+With 
[FLIP-271](https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling)
 the community set out to build such autoscaler component as part of the 
Kubernetes Operator subproject. The Kubernetes Operator proved to be a great 
place for the autoscaler module as it already contains all the necessary bits 
for managing and upgrading production streaming applications.
+
+Fast-forward to the 1.4.0 release and we now have the first fully functional 
autoscaler implementation in the operator, ready to be tested and used in 
production applications. For more, detailed information, please refer to the 
[Autoscaler 
Documentation](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.4/docs/custom-resource/autoscaler/).
+
+### Overview
+
+The autoscaler uses Flink source and operator vertex metrics to efficiently 
and independently scale the job vertexes of the streaming pipeline.
+

Review Comment:
   ```suggestion
   The used metrics include: 
   Source metrics:
   - number of pending records (source only)
   - number of partitions (source only)
   - ingestion rate (source only)
   - processing rate
   - time spent processing (utilization)
   ```



##
docs/content/posts/2023-02-27-release-kubernetes-operator-1.4.0.md:
##
@@ -0,0 +1,77 @@
+---
+title:  "Apache Flink Kubernetes Operator 1.4.0 Release Announcement"
+date: "2023-02-27T08:00:00.000Z"
+authors:
+- gyfora:
+  name: "Gyula Fora"
+  twitter: "GyulaFora"
+- mxm:
+  name: "Max Michels"
+  twitter: "stadtlegende"
+aliases:
+- /news/2023/02/27/release-kubernetes-operator-1.4.0.html
+---
+
+We are proud to announce the latest stable release of the operator. In 
addition to the expected stability improvements and fixes, the 1.4.0 release 
introduces the first version of the long-awaited autoscaler module.
+
+## Flink Streaming Job Autoscaler
+
+A highly requested feature for Flink applications is the ability to scale the 
pipeline based on incoming data load and other performance metrics. While Flink 
has already provided some of the required building blocks, this feature has not 
yet been realised in the open source ecosystem.
+
+With 

[jira] [Created] (FLINK-31217) Update netty to current

2023-02-24 Thread Morey Straus (Jira)
Morey Straus created FLINK-31217:


 Summary: Update netty to current
 Key: FLINK-31217
 URL: https://issues.apache.org/jira/browse/FLINK-31217
 Project: Flink
  Issue Type: Technical Debt
Reporter: Morey Straus


Netty 3.10.6 has several vulnerabilities and needs updating to the current 
release.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31216) Update kryo to current

2023-02-24 Thread Morey Straus (Jira)
Morey Straus created FLINK-31216:


 Summary: Update kryo to current
 Key: FLINK-31216
 URL: https://issues.apache.org/jira/browse/FLINK-31216
 Project: Flink
  Issue Type: Technical Debt
Reporter: Morey Straus


kryo 2.24 is several years out of date and has a deserialization vulnerability 
associated with it.  Please update to current.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31215) Backpropagate processing rate limits from non-scalable bottlenecks to upstream operators

2023-02-24 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-31215:
--

 Summary: Backpropagate processing rate limits from non-scalable 
bottlenecks to upstream operators
 Key: FLINK-31215
 URL: https://issues.apache.org/jira/browse/FLINK-31215
 Project: Flink
  Issue Type: New Feature
  Components: Autoscaler, Kubernetes Operator
Reporter: Gyula Fora


The current algorithm scales operators based on input data rates by propagating 
it forward through the graph.

However there are cases where a certain operators processing capacity is 
limited either because it has a set maxParallelism or the users excludes it 
from scaling (or otherwise the capacity doesnt increase with scaling).

In these cases it doesn't make sense to scale upstream operators to the target 
data rate if the job is going to be bottlenecked by a downstream operator. But 
instead we should backpropagate the limit based on the non-scalable bottleneck.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-30571) Compute scale parallelism based on observed scalability

2023-02-24 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora reassigned FLINK-30571:
--

Assignee: (was: Gyula Fora)

> Compute scale parallelism based on observed scalability 
> 
>
> Key: FLINK-30571
> URL: https://issues.apache.org/jira/browse/FLINK-30571
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Gyula Fora
>Priority: Major
>
> When computing target parallelism for job vertices we currently assume linear 
> scaling with a fixed (1) coefficient.
> This assumes that in order to double the capacity we simply double the 
> parallelism.
> While linearity already might be violated by many real time workloads this 
> form of strong linearity rarely holds due to the overhead of increased 
> network traffic, coordination etc.
> As we can access past (parallelism, processingRate) information based on the 
> scaling history we should estimate the scalability coefficient either using a 
> simple or weighted linear regression.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] Samrat002 commented on a diff in pull request #21770: [FLINK-30277][python]Allow PYTHONPATH of Python Worker configurable

2023-02-24 Thread via GitHub


Samrat002 commented on code in PR #21770:
URL: https://github.com/apache/flink/pull/21770#discussion_r1117029594


##
flink-python/src/main/java/org/apache/flink/python/PythonOptions.java:
##
@@ -122,6 +122,19 @@ public class PythonOptions {
 + "optional parameter exists. The option 
is equivalent to the command line option "
 + "\"-pyreq\".");
 
+/** The configuration allows user to define python path for client and 
workers. */
+public static final ConfigOption PYTHON_PATH =
+ConfigOptions.key("env.PYTHONPATH")
+.stringType()
+.noDefaultValue()
+.withDescription(
+Description.builder()
+.text(
+"Specify the path on the Worker 
Node where the Flink Python Dependencies are installed, which "
++ "gets added into the 
PYTHONPATH of the Python Worker. "
++ "The option is 
equivalent to the command line option \"-penv.PYTHONPATH\".")

Review Comment:
   created a separate sub tasks to provide command line support 
[FLINK-31214](https://issues.apache.org/jira/browse/FLINK-31214). 
   Will raise a different pr for the same . 
   
   Currently removing it from description



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-31214) Add support for new command line option -py.pythonpath

2023-02-24 Thread Samrat Deb (Jira)
Samrat Deb created FLINK-31214:
--

 Summary: Add support for new command line option -py.pythonpath
 Key: FLINK-31214
 URL: https://issues.apache.org/jira/browse/FLINK-31214
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Samrat Deb
 Fix For: 1.17.0, 1.18.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] Samrat002 commented on a diff in pull request #21770: [FLINK-30277][python]Allow PYTHONPATH of Python Worker configurable

2023-02-24 Thread via GitHub


Samrat002 commented on code in PR #21770:
URL: https://github.com/apache/flink/pull/21770#discussion_r1117029594


##
flink-python/src/main/java/org/apache/flink/python/PythonOptions.java:
##
@@ -122,6 +122,19 @@ public class PythonOptions {
 + "optional parameter exists. The option 
is equivalent to the command line option "
 + "\"-pyreq\".");
 
+/** The configuration allows user to define python path for client and 
workers. */
+public static final ConfigOption PYTHON_PATH =
+ConfigOptions.key("env.PYTHONPATH")
+.stringType()
+.noDefaultValue()
+.withDescription(
+Description.builder()
+.text(
+"Specify the path on the Worker 
Node where the Flink Python Dependencies are installed, which "
++ "gets added into the 
PYTHONPATH of the Python Worker. "
++ "The option is 
equivalent to the command line option \"-penv.PYTHONPATH\".")

Review Comment:
   yes ! wanted to add commandline option for user. i will create a separate 
jira for that purpose . 
   i will remove the description for now 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] chucheng92 commented on pull request #21904: [FLINK-30935][connectors/kafka] Add Kafka serializers deserialize check when using SimpleVersionedSerializer

2023-02-24 Thread via GitHub


chucheng92 commented on PR #21904:
URL: https://github.com/apache/flink/pull/21904#issuecomment-1443789397

   hi @twalthr Timo, can u help me to review it? it's a small improvement.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gauravmiglanid11 commented on a diff in pull request #533: [FLINK-30920] [AutoScaler] adds exclude vertex ids for autoscaler to ignore for evaluation and scaling

2023-02-24 Thread via GitHub


gauravmiglanid11 commented on code in PR #533:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/533#discussion_r1117117865


##
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java:
##
@@ -222,19 +223,30 @@ private Map 
computeScalingSummary(
 Map> 
scalingHistory) {
 
 var out = new HashMap();
+var excludeVertexIdList = 
conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
 evaluatedMetrics.forEach(
 (v, metrics) -> {
-var currentParallelism =
-(int) 
metrics.get(ScalingMetric.PARALLELISM).getCurrent();
-var newParallelism =
-jobVertexScaler.computeScaleTargetParallelism(
-resource,
-conf,
+if (excludeVertexIdList.contains(v.toHexString())) {
+LOG.debug(
+"Vertex {} is part of `vertex.exclude.ids` 
config, Ignoring it for scaling",
+v);

Review Comment:
   got the issue, thanks, fixing it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] mxm commented on a diff in pull request #533: [FLINK-30920] [AutoScaler] adds exclude vertex ids for autoscaler to ignore for evaluation and scaling

2023-02-24 Thread via GitHub


mxm commented on code in PR #533:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/533#discussion_r1117086633


##
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java:
##
@@ -222,19 +223,30 @@ private Map 
computeScalingSummary(
 Map> 
scalingHistory) {
 
 var out = new HashMap();
+var excludeVertexIdList = 
conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
 evaluatedMetrics.forEach(
 (v, metrics) -> {
-var currentParallelism =
-(int) 
metrics.get(ScalingMetric.PARALLELISM).getCurrent();
-var newParallelism =
-jobVertexScaler.computeScaleTargetParallelism(
-resource,
-conf,
+if (excludeVertexIdList.contains(v.toHexString())) {
+LOG.debug(
+"Vertex {} is part of `vertex.exclude.ids` 
config, Ignoring it for scaling",
+v);

Review Comment:
   Can we make this info level?



##
flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java:
##
@@ -183,6 +184,29 @@ op1, evaluated(1, 70, 100),
 
assertFalse(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated, 
scalingSummary));
 }
 
+@Test
+public void testVertexesExclusionForScaling() {
+var sourceHexString = "0bfd135746ac8efb3cce668b12e16d3a";
+var source = JobVertexID.fromHexString(sourceHexString);
+var filterOperatorHexString = "869fb403873411306404e9f2e4438c0e";
+var filterOperator = 
JobVertexID.fromHexString(filterOperatorHexString);
+var sinkHexString = "a6b7102b8d3e3a9564998c1ffeb5e2b7";
+var sink = JobVertexID.fromHexString(sinkHexString);
+
+var scalingInfo = new AutoScalerInfo(new HashMap<>());
+var metrics =
+Map.of(
+source,
+evaluated(1, 70, 100),
+filterOperator,
+evaluated(1, 100, 80),
+sink,
+evaluated(1, 70, 80));
+// filter operator should not scale
+conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, 
List.of(filterOperatorHexString));
+assertFalse(scalingDecisionExecutor.scaleResource(flinkDep, 
scalingInfo, conf, metrics));

Review Comment:
   Can we test that it would have scaled if the configuration was not set?



##
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java:
##
@@ -222,19 +223,30 @@ private Map 
computeScalingSummary(
 Map> 
scalingHistory) {
 
 var out = new HashMap();
+var excludeVertexIdList = 
conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
 evaluatedMetrics.forEach(
 (v, metrics) -> {
-var currentParallelism =
-(int) 
metrics.get(ScalingMetric.PARALLELISM).getCurrent();
-var newParallelism =
-jobVertexScaler.computeScaleTargetParallelism(
-resource,
-conf,
+if (excludeVertexIdList.contains(v.toHexString())) {
+LOG.debug(
+"Vertex {} is part of `vertex.exclude.ids` 
config, Ignoring it for scaling",
+v);

Review Comment:
   This still requires more work to ensure the feature works for non-sink 
vertices. If you ignore a non-sink vertex, the downstream operators from it 
might be scaled based on the target rates calculated for the ignored vertex. We 
need to make sure to set the target rates based on the current rate for the 
ingored vertex. Here: 
https://github.com/apache/flink-kubernetes-operator/blob/c6cd7cbb83c5257960daebe65214035a6732ad0d/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java#L91



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] RocMarshal commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring

2023-02-24 Thread via GitHub


RocMarshal commented on code in PR #21999:
URL: https://github.com/apache/flink/pull/21999#discussion_r1117078618


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java:
##
@@ -362,30 +358,35 @@ private void testSyncSavepointWithEndInput(
 "savepointResult");
 harness.processAll();
 
-Assert.assertEquals(expectEndInput, 
TestBoundedOneInputStreamOperator.isInputEnded());
+
assertThat(TestBoundedOneInputStreamOperator.isInputEnded()).isEqualTo(expectEndInput);
 }
 
 @Test
-public void testCleanUpExceptionSuppressing() throws Exception {
+void testCleanUpExceptionSuppressing() throws Exception {
 try (StreamTaskMailboxTestHarness testHarness =
 new 
StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO)
 .addInput(STRING_TYPE_INFO)
 .setupOutputForSingletonOperatorChain(new 
FailingTwiceOperator())
 .build()) {
 
-try {
-testHarness.processElement(new StreamRecord<>("Doesn't 
matter", 0));
-throw new RuntimeException("Expected an exception but ran 
successfully");
-} catch (Exception ex) {
-ExceptionUtils.assertThrowable(ex, 
ExpectedTestException.class);
-}
+assertThatThrownBy(
+() ->
+testHarness.processElement(
+new StreamRecord<>("Doesn't 
matter", 0)))
+.satisfies(
+(Consumer)
+throwable ->
+ExceptionUtils.assertThrowable(
+throwable, 
ExpectedTestException.class));
 
-try {
-testHarness.finishProcessing();
-} catch (Exception ex) {
-// todo: checking for suppression if there are more exceptions 
during cleanup
-ExceptionUtils.assertThrowable(ex, 
FailingTwiceOperator.CloseException.class);
-}
+// todo: checking for suppression if there are more exceptions 
during cleanup
+assertThatThrownBy(testHarness::finishProcessing)
+.satisfies(
+(ThrowingConsumer)
+throwable ->
+ExceptionUtils.assertThrowable(
+throwable,
+
FailingTwiceOperator.CloseException.class));

Review Comment:
   https://user-images.githubusercontent.com/64569824/221202347-41aeb24a-2324-46cb-9985-0a3e5595faf6.png;>
   
   So, let's keep the current lines ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31066) Release Testing: Verify FLINK-29932 Upgrade Calcite to 1.29.0

2023-02-24 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693216#comment-17693216
 ] 

Matthias Pohl commented on FLINK-31066:
---

I ran the sample code (once compiled with Flink release-1.16 and once compiled 
with release-1.17) on a Flink cluster based on the binaries of 1.16.1 and 
1.17-SNAPSHOT (commit 21158c06)

The only config changed I added was the following one:
{code}
state.backend: hashmap
state.backend.type: filesystem
{code}

> Release Testing: Verify FLINK-29932 Upgrade Calcite to 1.29.0
> -
>
> Key: FLINK-31066
> URL: https://issues.apache.org/jira/browse/FLINK-31066
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Sergey Nuyanzin
>Assignee: Matthias Pohl
>Priority: Major
>  Labels: release-testing
> Attachments: SavepointReleaseTesting.java
>
>
> In fact this is a task to check all 3 Calcite upgrade related issues (1.27.0, 
> 1.28.0 and 1.29.0)
> Since there were added optimization for Sarg in Calcite 1.27.0 it would make 
> sense to check that different queries with Sarg operator are working ok.
> Also would make sense to check that SQL jobs with Sarg related queries could 
> be restored from previous Flink version.
> An example of SQL 
> {code:sql}
> SELECT a FROM MyTable WHERE a = 1 or a = 2 or a IS NOT NULL;{code}
> {code:sql}
> SELECT a FROM MyTable WHERE a = 1 or a = 2 or a IS NULL;
> {code}
> where MyTable is for instance
> {code:sql}
> CREATE TABLE MyTable (
>    a bigint,
>    b int not null,
>    c varchar,
>    d timestamp(3)
> ) with (...) 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] snuyanzin commented on a diff in pull request #21993: [FLINK-31166][table] Fix array_contains does not support null argumen…

2023-02-24 Thread via GitHub


snuyanzin commented on code in PR #21993:
URL: https://github.com/apache/flink/pull/21993#discussion_r1117070060


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java:
##
@@ -71,7 +71,11 @@ public final class SpecificInputTypeStrategies {
 
 /** Argument type derived from the array element type. */
 public static final ArgumentTypeStrategy ARRAY_ELEMENT_ARG =
-new ArrayElementArgumentTypeStrategy();
+new ArrayElementArgumentTypeStrategy(false);
+
+/** Argument type derived from the array element type. But leaves 
nullability untouched. */

Review Comment:
   "preserving" means keeping same.
   With current implementation if an input array is an array of not null ints 
and input parameter for  `ArrayElementArgumentTypeStrategy` constructor is 
`true` then it will force set to nullable.
   
   So it does different thing and not preserving



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31066) Release Testing: Verify FLINK-29932 Upgrade Calcite to 1.29.0

2023-02-24 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-31066:
--
Attachment: SavepointReleaseTesting.java

> Release Testing: Verify FLINK-29932 Upgrade Calcite to 1.29.0
> -
>
> Key: FLINK-31066
> URL: https://issues.apache.org/jira/browse/FLINK-31066
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Sergey Nuyanzin
>Assignee: Matthias Pohl
>Priority: Major
>  Labels: release-testing
> Attachments: SavepointReleaseTesting.java
>
>
> In fact this is a task to check all 3 Calcite upgrade related issues (1.27.0, 
> 1.28.0 and 1.29.0)
> Since there were added optimization for Sarg in Calcite 1.27.0 it would make 
> sense to check that different queries with Sarg operator are working ok.
> Also would make sense to check that SQL jobs with Sarg related queries could 
> be restored from previous Flink version.
> An example of SQL 
> {code:sql}
> SELECT a FROM MyTable WHERE a = 1 or a = 2 or a IS NOT NULL;{code}
> {code:sql}
> SELECT a FROM MyTable WHERE a = 1 or a = 2 or a IS NULL;
> {code}
> where MyTable is for instance
> {code:sql}
> CREATE TABLE MyTable (
>    a bigint,
>    b int not null,
>    c varchar,
>    d timestamp(3)
> ) with (...) 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] RocMarshal commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring

2023-02-24 Thread via GitHub


RocMarshal commented on code in PR #21999:
URL: https://github.com/apache/flink/pull/21999#discussion_r1117066514


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java:
##
@@ -1103,35 +1143,41 @@ private void 
testFailToConfirmCheckpointMessage(Consumer> consu
  * finished.
  */
 @Test
-public void testCheckpointFailueOnClosedOperator() throws Throwable {
+void testCheckpointFailueOnClosedOperator() throws Exception {
 ClosingOperator operator = new ClosingOperator<>();
 StreamTaskMailboxTestHarnessBuilder builder =
 new StreamTaskMailboxTestHarnessBuilder<>(
 OneInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
 .addInput(BasicTypeInfo.INT_TYPE_INFO);
 try (StreamTaskMailboxTestHarness harness =
 
builder.setupOutputForSingletonOperatorChain(operator).build()) {
-// keeps the mailbox from suspending
-harness.setAutoProcess(false);
-harness.processElement(new StreamRecord<>(1));
-
-harness.streamTask.operatorChain.finishOperators(
-harness.streamTask.getActionExecutor(), StopMode.DRAIN);
-harness.streamTask.operatorChain.closeAllOperators();
-assertTrue(ClosingOperator.closed.get());
-
-harness.streamTask.triggerCheckpointOnBarrier(
-new CheckpointMetaData(1, 0),
-CheckpointOptions.forCheckpointWithDefaultLocation(),
-new CheckpointMetricsBuilder());
-} catch (Exception ex) {
-ExceptionUtils.assertThrowableWithMessage(
-ex, "OperatorChain and Task should never be closed at this 
point");
+assertThatThrownBy(
+() -> {
+// keeps the mailbox from suspending
+harness.setAutoProcess(false);
+harness.processElement(new StreamRecord<>(1));
+
+
harness.streamTask.operatorChain.finishOperators(
+
harness.streamTask.getActionExecutor(), StopMode.DRAIN);
+
harness.streamTask.operatorChain.closeAllOperators();
+
assertThat(ClosingOperator.closed.get()).isTrue();

Review Comment:
   Yes, the small scope could help devs to locate the lines with expected 
exception. I'd like it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-31066) Release Testing: Verify FLINK-29932 Upgrade Calcite to 1.29.0

2023-02-24 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693212#comment-17693212
 ] 

Matthias Pohl edited comment on FLINK-31066 at 2/24/23 2:15 PM:


I attached the job to this Jira issue that I could use to generate different 
execution plans for 1.16.1 and 1.17-SNAPSHOT.
1.16.1:
{code}
 [7]:Join(joinType=[InnerJoin], where=[(SEARCH(a, Sarg[100]) OR ((c = 200) AND 
(b = 400)))], select=[a, c, b], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
+- StreamingFileWriter
   +- Sink: end
{code}
1.17-SNAPSHOT:
{code}
 [7]:Join(joinType=[InnerJoin], where=[((a = 100) OR ((c = 200) AND (b = 
400)))], select=[a, c, b], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
+- StreamingFileWriter
   +- Sink: end
{code}

Restoring the job from a savepoint that was generated with 1.16.1 resulted in 
the following error:
{code}
./flink-1.17-21158c06-SNAPSHOT/bin/flink run -s 
/home/mapohl/research/FLINK-31066/run-1.16.1/savepoints/savepoint-ce617a-f674354c199c
 flink-examples-table_2.12-1.17-SNAPSHOT-SavepointReleaseTesting.jar 
$(pwd)/run-1.17-SNAPSHOT



 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Failed to execute sql
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)
at 
org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at 
org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:938)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:883)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:989)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:765)
at 
org.apache.flink.table.examples.java.basics.SavepointReleaseTesting.main(SavepointReleaseTesting.java:116)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
... 9 more
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 
'insert-into_default_catalog.default_database.sink_table'.
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2212)
at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:189)
at 
org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:921)
... 18 more
Caused by: java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobInitializationException: Could not start the 
JobMaster.
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at 
java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at 

[jira] [Commented] (FLINK-31066) Release Testing: Verify FLINK-29932 Upgrade Calcite to 1.29.0

2023-02-24 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693212#comment-17693212
 ] 

Matthias Pohl commented on FLINK-31066:
---

I attached the job to this Jira issue that I could use to generate different 
execution plans for 1.16.1 and 1.17-SNAPSHOT.
1.16.1:
{code}

{code}
1.17-SNAPSHOT:
{code}

{code}

Restoring the job from a savepoint that was generated with 1.16.1 resulted in 
the following error:
{code}
./flink-1.17-21158c06-SNAPSHOT/bin/flink run -s 
/home/mapohl/research/FLINK-31066/run-1.16.1/savepoints/savepoint-ce617a-f674354c199c
 flink-examples-table_2.12-1.17-SNAPSHOT-SavepointReleaseTesting.jar 
$(pwd)/run-1.17-SNAPSHOT



 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Failed to execute sql
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)
at 
org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at 
org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:938)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:883)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:989)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:765)
at 
org.apache.flink.table.examples.java.basics.SavepointReleaseTesting.main(SavepointReleaseTesting.java:116)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
... 9 more
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 
'insert-into_default_catalog.default_database.sink_table'.
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2212)
at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:189)
at 
org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:921)
... 18 more
Caused by: java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobInitializationException: Could not start the 
JobMaster.
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at 
java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could 
not start the JobMaster.
at 
org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
at 

[GitHub] [flink] RocMarshal commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring

2023-02-24 Thread via GitHub


RocMarshal commented on code in PR #21999:
URL: https://github.com/apache/flink/pull/21999#discussion_r1117060917


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java:
##
@@ -1182,13 +1228,13 @@ public void testThreadInvariants() throws Throwable {
 });
 runningTask.invocationFuture.get();
 
-assertThat(
-runningTask.streamTask.getTaskClassLoader(), 
is(sameInstance(taskClassLoader)));
+
assertThat(runningTask.streamTask.getTaskClassLoader()).isSameAs(taskClassLoader);
+
assertThat(runningTask.streamTask.getTaskClassLoader()).isSameAs(taskClassLoader);

Review Comment:
   yes, nice catch! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] RocMarshal commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring

2023-02-24 Thread via GitHub


RocMarshal commented on code in PR #21999:
URL: https://github.com/apache/flink/pull/21999#discussion_r1117058961


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java:
##
@@ -1103,35 +1143,41 @@ private void 
testFailToConfirmCheckpointMessage(Consumer> consu
  * finished.
  */
 @Test
-public void testCheckpointFailueOnClosedOperator() throws Throwable {
+void testCheckpointFailueOnClosedOperator() throws Exception {
 ClosingOperator operator = new ClosingOperator<>();
 StreamTaskMailboxTestHarnessBuilder builder =
 new StreamTaskMailboxTestHarnessBuilder<>(
 OneInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
 .addInput(BasicTypeInfo.INT_TYPE_INFO);
 try (StreamTaskMailboxTestHarness harness =
 
builder.setupOutputForSingletonOperatorChain(operator).build()) {
-// keeps the mailbox from suspending
-harness.setAutoProcess(false);
-harness.processElement(new StreamRecord<>(1));
-
-harness.streamTask.operatorChain.finishOperators(
-harness.streamTask.getActionExecutor(), StopMode.DRAIN);
-harness.streamTask.operatorChain.closeAllOperators();
-assertTrue(ClosingOperator.closed.get());
-
-harness.streamTask.triggerCheckpointOnBarrier(
-new CheckpointMetaData(1, 0),
-CheckpointOptions.forCheckpointWithDefaultLocation(),
-new CheckpointMetricsBuilder());
-} catch (Exception ex) {
-ExceptionUtils.assertThrowableWithMessage(
-ex, "OperatorChain and Task should never be closed at this 
point");
+assertThatThrownBy(
+() -> {
+// keeps the mailbox from suspending
+harness.setAutoProcess(false);
+harness.processElement(new StreamRecord<>(1));
+
+
harness.streamTask.operatorChain.finishOperators(
+
harness.streamTask.getActionExecutor(), StopMode.DRAIN);
+
harness.streamTask.operatorChain.closeAllOperators();
+
assertThat(ClosingOperator.closed.get()).isTrue();
+
+harness.streamTask.triggerCheckpointOnBarrier(
+new CheckpointMetaData(1, 0),
+
CheckpointOptions.forCheckpointWithDefaultLocation(),
+new CheckpointMetricsBuilder());
+})
+.satisfies(
+(Consumer)
+throwable ->
+
ExceptionUtils.assertThrowableWithMessage(
+throwable,
+"OperatorChain and Task 
should never be closed at this point"));

Review Comment:
   I can understand what you mean. I have a try on it with failed result, So, 
what about  keeping the current lines, please let me know what's your opinion.
   
   https://user-images.githubusercontent.com/64569824/221199128-34ed3b53-1106-4b1f-b77a-5e7124fbd20c.png;>
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Samrat002 commented on a diff in pull request #21770: [FLINK-30277][python]Allow PYTHONPATH of Python Worker configurable

2023-02-24 Thread via GitHub


Samrat002 commented on code in PR #21770:
URL: https://github.com/apache/flink/pull/21770#discussion_r1117042392


##
docs/layouts/shortcodes/generated/execution_config_configuration.html:
##
@@ -15,7 +15,7 @@
 The max number of async i/o operation that the async lookup 
join can trigger.
 
 
-table.exec.async-lookup.output-mode Batch Streaming
+table.exec.async-lookup.output-mode Batch Streaming

Review Comment:
   reverted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-12451) [Bitwise Functions] Add BIT_XOR function supported in Table API and SQL

2023-02-24 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser reassigned FLINK-12451:
--

Assignee: Ran Tao

> [Bitwise Functions] Add BIT_XOR function supported in Table API and SQL
> ---
>
> Key: FLINK-12451
> URL: https://issues.apache.org/jira/browse/FLINK-12451
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Zhanchun Zhang
>Assignee: Ran Tao
>Priority: Major
>  Labels: auto-unassigned
>
> Bitwise XOR, returns an unsigned 64-bit integer.
> eg:
> SELECT 1 ^ 1; returns 0
> SELECT 1 ^ 0; returns 1
> SELECT 11 ^ 3; returns 8



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] Samrat002 commented on a diff in pull request #21770: [FLINK-30277][python]Allow PYTHONPATH of Python Worker configurable

2023-02-24 Thread via GitHub


Samrat002 commented on code in PR #21770:
URL: https://github.com/apache/flink/pull/21770#discussion_r1117031077


##
flink-python/src/main/java/org/apache/flink/python/env/PythonDependencyInfo.java:
##
@@ -100,13 +104,32 @@ public PythonDependencyInfo(
 @Nullable String requirementsCacheDir,
 @Nonnull Map archives,
 @Nonnull String pythonExec,
-@Nonnull String executionMode) {
+@Nullable String pythonPath) {
+this(
+pythonFiles,
+requirementsFilePath,
+requirementsCacheDir,
+archives,
+pythonExec,
+PYTHON_EXECUTION_MODE.defaultValue(),
+pythonPath);
+}

Review Comment:
   ack 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Samrat002 commented on a diff in pull request #21770: [FLINK-30277][python]Allow PYTHONPATH of Python Worker configurable

2023-02-24 Thread via GitHub


Samrat002 commented on code in PR #21770:
URL: https://github.com/apache/flink/pull/21770#discussion_r1117029594


##
flink-python/src/main/java/org/apache/flink/python/PythonOptions.java:
##
@@ -122,6 +122,19 @@ public class PythonOptions {
 + "optional parameter exists. The option 
is equivalent to the command line option "
 + "\"-pyreq\".");
 
+/** The configuration allows user to define python path for client and 
workers. */
+public static final ConfigOption PYTHON_PATH =
+ConfigOptions.key("env.PYTHONPATH")
+.stringType()
+.noDefaultValue()
+.withDescription(
+Description.builder()
+.text(
+"Specify the path on the Worker 
Node where the Flink Python Dependencies are installed, which "
++ "gets added into the 
PYTHONPATH of the Python Worker. "
++ "The option is 
equivalent to the command line option \"-penv.PYTHONPATH\".")

Review Comment:
   yes 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Samrat002 commented on a diff in pull request #21770: [FLINK-30277][python]Allow PYTHONPATH of Python Worker configurable

2023-02-24 Thread via GitHub


Samrat002 commented on code in PR #21770:
URL: https://github.com/apache/flink/pull/21770#discussion_r1117029166


##
docs/static/generated/rest_v1_dispatcher.yml:
##
@@ -6,7 +6,7 @@ info:
   license:
 name: Apache 2.0
 url: https://www.apache.org/licenses/LICENSE-2.0.html
-  version: v1/1.17-SNAPSHOT
+  version: v1/1.18-SNAPSHOT

Review Comment:
   ack i will remove it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-31213) Aggregation merge engine supports retract inputs

2023-02-24 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-31213:


 Summary: Aggregation merge engine supports retract inputs
 Key: FLINK-31213
 URL: https://issues.apache.org/jira/browse/FLINK-31213
 Project: Flink
  Issue Type: Improvement
  Components: Table Store
Reporter: Jingsong Lee
Assignee: Jingsong Lee
 Fix For: table-store-0.4.0


For sum, it can support retracts.
For others, which do not support retraction (`UPDATE_BEFORE` and `DELETE`). If 
the user allow some functions to ignore retraction messages, the user can 
configure: `'fields.${field_name}.ignore-retract'='true'`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zentol commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-24 Thread via GitHub


zentol commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1117002036


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##
@@ -157,13 +165,21 @@ public Map calculateScore(
 .getMaxParallelism(),
 parallelism.get(evi.getJobVertexId()),
 evi.getSubtaskIndex());
+// Estimate state size per key group. For scoring, assume 1 if 
size estimate is 0 to
+// accommodate for averaging non-zero states
+Optional kgSizeMaybe =
+stateSizeEstimates.estimate(evi.getJobVertexId()).map(e -> 
Math.max(e, 1L));
+if (!kgSizeMaybe.isPresent()) {
+continue;
+}

Review Comment:
   > if we place the task on a different TM then it will have to download all 
its SST files (or am I missing something?)
   
   That part is clear.
   
   if incremental state is also covered (even if an operator didn't add any 
state to it in the latest checkpoint), then we're good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-24 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116992436


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocationsInfo.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.util.Collections.emptyMap;
+
+class AllocationsInfo {

Review Comment:
   Implemented in 6d4971f8f646409ff09173f7dfe1d5245b95706f and 
5fe039b4cb507fcfcc88906c40d86561fc9e8ed5



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] RocMarshal commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring

2023-02-24 Thread via GitHub


RocMarshal commented on code in PR #21999:
URL: https://github.com/apache/flink/pull/21999#discussion_r1116991198


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java:
##
@@ -945,12 +982,15 @@ public void 
testAsyncCheckpointingConcurrentCloseBeforeAcknowledge() throws Exce
 // make sure that all state handles have been discarded
 discardFuture.get();
 
-try {
-mockEnvironment.getAcknowledgeCheckpointFuture().get(10L, 
TimeUnit.MILLISECONDS);
-fail("The checkpoint should not get acknowledged.");
-} catch (TimeoutException expected) {
-// future should not be completed
-}
+assertThatThrownBy(
+() -> {
+// future should not be completed
+mockEnvironment
+.getAcknowledgeCheckpointFuture()
+.get(10L, TimeUnit.MILLISECONDS);
+fail("The checkpoint should not get 
acknowledged.");

Review Comment:
   Yes. I'll check the same cases in the test.
   thx~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22017: [BP-1.17][FLINK-31142][Table SQL/Client] Catch TokenMgrError in SqlCommandParserImpl#scan

2023-02-24 Thread via GitHub


flinkbot commented on PR #22017:
URL: https://github.com/apache/flink/pull/22017#issuecomment-1443653369

   
   ## CI report:
   
   * d9a32ec908f18c5b1f6dc79f51f61e0d5f275657 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] liuyongvs commented on a diff in pull request #21993: [FLINK-31166][table] Fix array_contains does not support null argumen…

2023-02-24 Thread via GitHub


liuyongvs commented on code in PR #21993:
URL: https://github.com/apache/flink/pull/21993#discussion_r1116965989


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java:
##
@@ -71,7 +71,11 @@ public final class SpecificInputTypeStrategies {
 
 /** Argument type derived from the array element type. */
 public static final ArgumentTypeStrategy ARRAY_ELEMENT_ARG =
-new ArrayElementArgumentTypeStrategy();
+new ArrayElementArgumentTypeStrategy(false);
+
+/** Argument type derived from the array element type. But leaves 
nullability untouched. */

Review Comment:
   the reason why i add ARRAY_ELEMENT_ARG and ARRAY_ELEMENT_ARG_NULLABLE like 
COMMON_ARG/COMMON_ARG_NULLABLE is some  array function may need element arg not 
null. so i preserve.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin opened a new pull request, #22017: [BP-1.17][FLINK-31142][Table SQL/Client] Catch TokenMgrError in SqlCommandParserImpl#scan

2023-02-24 Thread via GitHub


snuyanzin opened a new pull request, #22017:
URL: https://github.com/apache/flink/pull/22017

   This is 1.17 backport of https://github.com/apache/flink/pull/22007


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-24 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116959052


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##
@@ -157,13 +165,21 @@ public Map calculateScore(
 .getMaxParallelism(),
 parallelism.get(evi.getJobVertexId()),
 evi.getSubtaskIndex());
+// Estimate state size per key group. For scoring, assume 1 if 
size estimate is 0 to
+// accommodate for averaging non-zero states
+Optional kgSizeMaybe =
+stateSizeEstimates.estimate(evi.getJobVertexId()).map(e -> 
Math.max(e, 1L));
+if (!kgSizeMaybe.isPresent()) {
+continue;
+}

Review Comment:
   We still need to consider this state: if we place the task on a different TM 
then it will have to download all its SST files (or am I missing something?)
   
   There are two methods for keyed state:
   1. `handle.getStateSize()` returns the full state size
   2. 
[`handle.getCheckpointedSize()`](https://github.com/apache/flink/blob/464ded1c2a0497255b70f711167c3b7ae52ea0f7/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeStateHandle.java#L62)
 returns "incremental" state size
   
   As per above, `getStateSize` is used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22016: [FLINK-31174][doc] fix inconsistent data format between Learn-Flink Doc and flink-training-repo

2023-02-24 Thread via GitHub


flinkbot commented on PR #22016:
URL: https://github.com/apache/flink/pull/22016#issuecomment-1443641876

   
   ## CI report:
   
   * 8adeac4636d28c86512f53b09de689830fc3 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-3117) Storm Tick Tuples are not supported

2023-02-24 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-3117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-3117:
--
Labels: pull-request-available  (was: )

> Storm Tick Tuples are not supported
> ---
>
> Key: FLINK-3117
> URL: https://issues.apache.org/jira/browse/FLINK-3117
> Project: Flink
>  Issue Type: Bug
>  Components: Legacy Components / Storm Compatibility
>Affects Versions: 0.10.0, 0.10.1
>Reporter: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
>
> Tick Tuples are Storm's mechanism for Windowing. Every N seconds, Storm 
> triggers emission of a tick tuple which can then be used as a signal to emit 
> or process data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] ChenZhongPu opened a new pull request, #22016: [FLINK-3117][doc] fix inconsistent data format between Learn-Flink Doc and flink-training-repo

2023-02-24 Thread via GitHub


ChenZhongPu opened a new pull request, #22016:
URL: https://github.com/apache/flink/pull/22016

   Due to the previous [pr](https://github.com/apache/flink-training/pull/36) 
in which the data format of `TaxiRide` is changed, there is an inconsistency in 
[Data Pipelines & 
ETL](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/learn-flink/etl/).
 To be specific, 
   
   - Since we cannot obtain `startTime` and `endTime` at the same time,  I 
change the sample code to compute the (Euclidean) distance instead.
   - I also conducted the experiment using the updated code with parallelism 4 
to get the new sample output.
   - Chinese translation is also updated.
   
   Before:
   
   ```java
Interval rideInterval = new Interval(ride.startTime, ride.endTime);
Minutes duration = rideInterval.toDuration().toStandardMinutes();
out.collect(new Tuple2<>(ride.startCell, duration));
   ```
   
   After:
   
   ```java
   double distance = ride.getEuclideanDistance(ride.startLon, ride.startLat);
   out.collect(new Tuple2<>(ride.startCell, distance));
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin merged pull request #22007: [FLINK-31142][Table SQL/Client] Catch TokenMgrError in SqlCommandParserImpl#scan

2023-02-24 Thread via GitHub


snuyanzin merged PR #22007:
URL: https://github.com/apache/flink/pull/22007


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   >