[jira] [Updated] (FLINK-23970) Split off the behaviour for finished StreamTask(s)

2022-02-14 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao updated FLINK-23970:

Parent: (was: FLINK-23883)
Issue Type: Improvement  (was: Sub-task)

> Split off the behaviour for finished StreamTask(s)
> --
>
> Key: FLINK-23970
> URL: https://issues.apache.org/jira/browse/FLINK-23970
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Yun Gao
>Priority: Major
> Fix For: 1.15.0
>
>
> We will have a double check on how we could better abstract the behavior of 
> the tasks marked as finished on recovery, the target is to make the behaviors 
> implemented as centralized as we can (like introducing specialized 
> _FinishedStreamTask_).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25432) Introduce common interfaces for cleaning up local and global job data

2022-02-14 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-25432:
--
Release Note: Utilizes the newly added component JobResultStore (see 
FLINK-25431) to pick up already finished but not properly cleaned up jobs after 
a failover.

> Introduce common interfaces for cleaning up local and global job data
> -
>
> Key: FLINK-25432
> URL: https://issues.apache.org/jira/browse/FLINK-25432
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.15.0
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> We want to combine the job-specific cleanup of the different resources and 
> provide a common {{ResourceCleaner}} taking care of the actual cleanup of all 
> resources.
> This needs to be integrated into the {{Dispatcher}}.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-23970) Split off the behaviour for finished StreamTask(s)

2022-02-14 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao updated FLINK-23970:

Parent: FLINK-26113
Issue Type: Sub-task  (was: Improvement)

> Split off the behaviour for finished StreamTask(s)
> --
>
> Key: FLINK-23970
> URL: https://issues.apache.org/jira/browse/FLINK-23970
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Yun Gao
>Priority: Major
> Fix For: 1.15.0
>
>
> We will have a double check on how we could better abstract the behavior of 
> the tasks marked as finished on recovery, the target is to make the behaviors 
> implemented as centralized as we can (like introducing specialized 
> _FinishedStreamTask_).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24140) Enhance the ITCase for FLIP-147

2022-02-14 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao updated FLINK-24140:

Parent: (was: FLINK-23883)
Issue Type: Improvement  (was: Sub-task)

> Enhance the ITCase for FLIP-147
> ---
>
> Key: FLINK-24140
> URL: https://issues.apache.org/jira/browse/FLINK-24140
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Yun Gao
>Priority: Major
> Fix For: 1.15.0
>
>
> We might enhance the ITCase to cover more cases:
> 1. Sources using new source API (bounded, stop-with-savepoint --drain)
> 2. Multiple-inputs operator with source chained (bounded, stop-with-savepoint 
> --drain)
> 3. Source / sink using UnionListState (bounded, stop-with-savepoint --drain)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24140) Enhance the ITCase for FLIP-147

2022-02-14 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao updated FLINK-24140:

Parent: FLINK-26113
Issue Type: Sub-task  (was: Improvement)

> Enhance the ITCase for FLIP-147
> ---
>
> Key: FLINK-24140
> URL: https://issues.apache.org/jira/browse/FLINK-24140
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Yun Gao
>Priority: Major
> Fix For: 1.15.0
>
>
> We might enhance the ITCase to cover more cases:
> 1. Sources using new source API (bounded, stop-with-savepoint --drain)
> 2. Multiple-inputs operator with source chained (bounded, stop-with-savepoint 
> --drain)
> 3. Source / sink using UnionListState (bounded, stop-with-savepoint --drain)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot edited a comment on pull request #18681: [FLINK-26032][streaming] check explicit env allowed on StreamExecutionEnvironment own.

2022-02-14 Thread GitBox


flinkbot edited a comment on pull request #18681:
URL: https://github.com/apache/flink/pull/18681#issuecomment-1033535247


   
   ## CI report:
   
   * 77e43dbd211dc9274041996d5a9d24607dc9f953 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=31299)
 
   * 2b10fb621e515705a40c62f4ccbf131992318759 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=31379)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (FLINK-25490) Update the Chinese document related to final checkpoint

2022-02-14 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25490?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao reassigned FLINK-25490:
---

Assignee: Yun Gao

> Update the Chinese document related to final checkpoint
> ---
>
> Key: FLINK-25490
> URL: https://issues.apache.org/jira/browse/FLINK-25490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Runtime / Checkpointing
>Affects Versions: 1.15.0
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
> Fix For: 1.15.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot edited a comment on pull request #18726: [FLINK-25983] Add API for configuring maximal watermark drift

2022-02-14 Thread GitBox


flinkbot edited a comment on pull request #18726:
URL: https://github.com/apache/flink/pull/18726#issuecomment-1036101042


   
   ## CI report:
   
   * 3e0350153d25b1ebac5254b968427a9b085787d6 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=31267)
 
   * 06102b172e575b0d2d927e32577fc04d6f9f00bf UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-25433) Integrate retry strategy for cleanup stage

2022-02-14 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-25433:
--
Release Note: Adds retry logic to the cleanup steps of a finished job. This 
feature changes the way how Flink jobs are cleaned up. Instead of trying once 
to clean up the job, this step will be repeated until it succeeds. Users are 
meant to fix the issue that prevents Flink from finalizing the job cleanup. 
Retry iteration intervals can be configuration through the newly added 
configuration parameters "jobmanager.cleanup.min-delay" and 
"jobmanager.cleanup.max-delay".

> Integrate retry strategy for cleanup stage
> --
>
> Key: FLINK-25433
> URL: https://issues.apache.org/jira/browse/FLINK-25433
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> The {{ResourceCleaner}} should be able to cleanup not only once but retry 
> infinitely.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] twalthr closed pull request #18624: [FLINK-25388][table-planner] Add consumedOptions to ExecNodeMetadata

2022-02-14 Thread GitBox


twalthr closed pull request #18624:
URL: https://github.com/apache/flink/pull/18624


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-25388) Add annotation to all StreamExec nodes

2022-02-14 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-25388.

Fix Version/s: 1.15.0
   Resolution: Fixed

Fixed in master: 1ce16c5c58ebaedf9d3e051ddf8aee8379149320

> Add annotation to all StreamExec nodes
> --
>
> Key: FLINK-25388
> URL: https://issues.apache.org/jira/browse/FLINK-25388
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Assignee: Marios Trivyzas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> - Add an annotation to very ExecNode that we currently support
> - Identify consumed exec config options
> - Leave `producedOperators` empty for now, we will fill it in a later subtask



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26114) DefaultScheduler fails fatally in case of an error when shutting down the checkpoint-related resources

2022-02-14 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-26114:
-

 Summary: DefaultScheduler fails fatally in case of an error when 
shutting down the checkpoint-related resources
 Key: FLINK-26114
 URL: https://issues.apache.org/jira/browse/FLINK-26114
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.15.0, 1.16.0
Reporter: Matthias Pohl


In contrast to the {{AdaptiveScheduler}}, the {{DefaultScheduler}} fails 
fatally in case of an error while cleaning up the checkpoint-related resources. 
This contradicts our new approach of retrying the cleanup of job-related data 
(see FLINK-25433). Instead, we would want the {{DefaultScheduler}} to return an 
exceptionally completed future with the exception. This enables the 
{{DefaultResourceCleaner}} to trigger a retry.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25388) Identify topology changing config options for all StreamExec nodes

2022-02-14 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-25388:
-
Description: 
- Add an annotation to very ExecNode that we currently support
- Identify consumed exec config options

  was:
- Add an annotation to very ExecNode that we currently support
- Identify consumed exec config options
- Leave `producedOperators` empty for now, we will fill it in a later subtask


> Identify topology changing config options for all StreamExec nodes
> --
>
> Key: FLINK-25388
> URL: https://issues.apache.org/jira/browse/FLINK-25388
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Assignee: Marios Trivyzas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> - Add an annotation to very ExecNode that we currently support
> - Identify consumed exec config options



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25388) Identify topology changing config options for all StreamExec nodes

2022-02-14 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-25388:
-
Summary: Identify topology changing config options for all StreamExec nodes 
 (was: Add annotation to all StreamExec nodes)

> Identify topology changing config options for all StreamExec nodes
> --
>
> Key: FLINK-25388
> URL: https://issues.apache.org/jira/browse/FLINK-25388
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Assignee: Marios Trivyzas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> - Add an annotation to very ExecNode that we currently support
> - Identify consumed exec config options
> - Leave `producedOperators` empty for now, we will fill it in a later subtask



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26115) Multiple Kafka connector tests failed due to The topic metadata failed to propagate to Kafka broker

2022-02-14 Thread Yun Gao (Jira)
Yun Gao created FLINK-26115:
---

 Summary: Multiple Kafka connector tests failed due to The topic 
metadata failed to propagate to Kafka broker
 Key: FLINK-26115
 URL: https://issues.apache.org/jira/browse/FLINK-26115
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.14.3, 1.13.5, 1.15.0
Reporter: Yun Gao


This issues tracks all the related issues with "The topic metadata failed to 
propagate to Kafka broker"



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-23431) KafkaProducerExactlyOnceITCase.testMultipleSinkOperators fails with "The topic metadata failed to propagate to Kafka broker."

2022-02-14 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao updated FLINK-23431:

Parent: FLINK-26115
Issue Type: Sub-task  (was: Bug)

> KafkaProducerExactlyOnceITCase.testMultipleSinkOperators fails with "The 
> topic metadata failed to propagate to Kafka broker."
> -
>
> Key: FLINK-23431
> URL: https://issues.apache.org/jira/browse/FLINK-23431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Affects Versions: 1.14.0, 1.13.1, 1.15.0
>Reporter: Dawid Wysakowicz
>Assignee: Fabian Paul
>Priority: Major
>  Labels: stale-assigned, test-stability
> Fix For: 1.15.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=20694&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5&l=6879
> {code}
> Jul 19 14:37:19 [ERROR] 
> testMultipleSinkOperators(org.apache.flink.streaming.connectors.kafka.KafkaProducerExactlyOnceITCase)
>   Time elapsed: 11.135 s  <<< FAILURE!
> Jul 19 14:37:19 java.lang.AssertionError: Create test topic : 
> exactlyTopicCustomOperator20 failed, The topic metadata failed to propagate 
> to Kafka broker.
> Jul 19 14:37:19   at org.junit.Assert.fail(Assert.java:88)
> Jul 19 14:37:19   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.createTestTopic(KafkaTestEnvironmentImpl.java:226)
> Jul 19 14:37:19   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createTestTopic(KafkaTestEnvironment.java:112)
> Jul 19 14:37:19   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestBase.createTestTopic(KafkaTestBase.java:212)
> Jul 19 14:37:19   at 
> org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testExactlyOnce(KafkaProducerTestBase.java:368)
> Jul 19 14:37:19   at 
> org.apache.flink.streaming.connectors.kafka.KafkaProducerExactlyOnceITCase.testMultipleSinkOperators(KafkaProducerExactlyOnceITCase.java:60)
> Jul 19 14:37:19   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Jul 19 14:37:19   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Jul 19 14:37:19   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Jul 19 14:37:19   at java.lang.reflect.Method.invoke(Method.java:498)
> Jul 19 14:37:19   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> Jul 19 14:37:19   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Jul 19 14:37:19   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> Jul 19 14:37:19   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Jul 19 14:37:19   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> Jul 19 14:37:19   at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> Jul 19 14:37:19   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> Jul 19 14:37:19   at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> Jul 19 14:37:19   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> Jul 19 14:37:19   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> Jul 19 14:37:19   at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> Jul 19 14:37:19   at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> Jul 19 14:37:19   at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> Jul 19 14:37:19   at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> Jul 19 14:37:19   at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> Jul 19 14:37:19   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> Jul 19 14:37:19   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> Jul 19 14:37:19   at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> Jul 19 14:37:19   at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> Jul 19 14:37:19   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> Jul 19 14:37:19   at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> Jul 19 14:37:19   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
> Jul 19 14:37:19   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
>

[GitHub] [flink] rkhachatryan commented on a change in pull request #18741: [FLINK-26101][changelog] Avoid shared state registry to discard multi-registered identical changelog state

2022-02-14 Thread GitBox


rkhachatryan commented on a change in pull request #18741:
URL: https://github.com/apache/flink/pull/18741#discussion_r805588711



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
##
@@ -237,6 +237,23 @@ public FSDataInputStream openInputStream() throws 
IOException {
 public Optional asBytesIfInMemory() {
 throw new UnsupportedOperationException("Should not call 
here.");
 }
+
+@Override
+public boolean equals(Object o) {
+if (this == o) {
+return true;
+}
+if (o == null || getClass() != o.getClass()) {
+return false;
+}
+StreamStateHandleWrapper that = (StreamStateHandleWrapper) o;
+return Objects.equals(keyedStateHandle, that.keyedStateHandle);

Review comment:
   There is one more problem here - `PlaceholderStreamStateHandle`.
   
   If there were two or more materializations, but the internal backend used a 
previously uploaded SST, then `equals` will return false. The wrapper will 
receive a discard call again.
   
   I 
[used](https://github.com/apache/flink/pull/18556/commits/2403a8446da47c558d1d0c3283d056ac6f7b1988#diff-9ec20ab5bc5e16e0c04167e1b546902cdb49635ac5d15f4172fb7f36a98c77bdR250)
 `getStateHandleId` for comparison, and it solved the problem.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-25522) KafkaShuffleExactlyOnceITCase.testAssignedToPartitionFailureRecoveryProcessingTime

2022-02-14 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao updated FLINK-25522:

Parent: FLINK-26115
Issue Type: Sub-task  (was: Bug)

> KafkaShuffleExactlyOnceITCase.testAssignedToPartitionFailureRecoveryProcessingTime
> --
>
> Key: FLINK-25522
> URL: https://issues.apache.org/jira/browse/FLINK-25522
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Affects Versions: 1.15.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.15.0
>
>
> The test 
> {{KafkaShuffleExactlyOnceITCase.testAssignedToPartitionFailureRecoveryProcessingTime}}
>  failed on AZP with:
> {code}
> 2022-01-05T04:31:25.7208273Z java.util.concurrent.TimeoutException: The topic 
> metadata failed to propagate to Kafka broker.
> 2022-01-05T04:31:25.7210543Z  at 
> org.apache.flink.core.testutils.CommonTestUtils.waitUtil(CommonTestUtils.java:214)
> 2022-01-05T04:31:25.7211289Z  at 
> org.apache.flink.core.testutils.CommonTestUtils.waitUtil(CommonTestUtils.java:230)
> 2022-01-05T04:31:25.7212025Z  at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.createTestTopic(KafkaTestEnvironmentImpl.java:216)
> 2022-01-05T04:31:25.7212944Z  at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createTestTopic(KafkaTestEnvironment.java:98)
> 2022-01-05T04:31:25.7213794Z  at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestBase.createTestTopic(KafkaTestBase.java:216)
> 2022-01-05T04:31:25.7214854Z  at 
> org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleExactlyOnceITCase.testAssignedToPartitionFailureRecovery(KafkaShuffleExactlyOnceITCase.java:158)
> 2022-01-05T04:31:25.7215823Z  at 
> org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleExactlyOnceITCase.testAssignedToPartitionFailureRecoveryProcessingTime(KafkaShuffleExactlyOnceITCase.java:81)
> 2022-01-05T04:31:25.7216532Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2022-01-05T04:31:25.7217307Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2022-01-05T04:31:25.7217917Z  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2022-01-05T04:31:25.7218437Z  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2022-01-05T04:31:25.7218969Z  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 2022-01-05T04:31:25.7219572Z  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2022-01-05T04:31:25.7220183Z  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 2022-01-05T04:31:25.7220770Z  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2022-01-05T04:31:25.7221346Z  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2022-01-05T04:31:25.7221959Z  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
> 2022-01-05T04:31:25.7222603Z  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
> 2022-01-05T04:31:25.7223413Z  at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 2022-01-05T04:31:25.7223871Z  at java.lang.Thread.run(Thread.java:748)
> 2022-01-05T04:31:25.7321823Z java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> 'partition_failure_recovery_ProcessingTime' already exists.
> 2022-01-05T04:31:25.7323411Z  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> 2022-01-05T04:31:25.7324069Z  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> 2022-01-05T04:31:25.7324696Z  at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> 2022-01-05T04:31:25.7325309Z  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> 2022-01-05T04:31:25.7326077Z  at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.createTestTopic(KafkaTestEnvironmentImpl.java:214)
> 2022-01-05T04:31:25.7326999Z  at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createTestTopic(KafkaTestEnvironment.java:98)
> 2022-01-05T04:31:25.7327659Z  at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestBase.createTestTopic(KafkaTestBase.java:216)
> 2022-01-05T04:31:25.7328418Z  at 
> org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleExactlyOnceITCase.testAssignedToPartitionFailureRecovery(KafkaShuffleExactlyOnceITCase.java:158)
> 2022-01-05T04:31:25.732

[jira] [Updated] (FLINK-24946) KafkaSourceLegacyITCase.testBigRecordJob fails on azure

2022-02-14 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao updated FLINK-24946:

Parent: FLINK-26115
Issue Type: Sub-task  (was: Bug)

> KafkaSourceLegacyITCase.testBigRecordJob fails on azure
> ---
>
> Key: FLINK-24946
> URL: https://issues.apache.org/jira/browse/FLINK-24946
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Affects Versions: 1.15.0
>Reporter: Yun Gao
>Assignee: Fabian Paul
>Priority: Major
>  Labels: stale-assigned, test-stability
>
> {code:java}
> Nov 17 16:43:38 [ERROR] Tests run: 21, Failures: 1, Errors: 0, Skipped: 0, 
> Time elapsed: 189.411 s <<< FAILURE! - in 
> org.apache.flink.connector.kafka.source.KafkaSourceLegacyITCase
> Nov 17 16:43:38 [ERROR] testBigRecordJob  Time elapsed: 13.21 s  <<< FAILURE!
> Nov 17 16:43:38 java.lang.AssertionError: Create test topic : 
> bigRecordTestTopic failed, The topic metadata failed to propagate to Kafka 
> broker.
> Nov 17 16:43:38   at org.junit.Assert.fail(Assert.java:89)
> Nov 17 16:43:38   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.createTestTopic(KafkaTestEnvironmentImpl.java:221)
> Nov 17 16:43:38   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createTestTopic(KafkaTestEnvironment.java:112)
> Nov 17 16:43:38   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestBase.createTestTopic(KafkaTestBase.java:213)
> Nov 17 16:43:38   at 
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runBigRecordTestTopology(KafkaConsumerTestBase.java:1378)
> Nov 17 16:43:38   at 
> org.apache.flink.connector.kafka.source.KafkaSourceLegacyITCase.testBigRecordJob(KafkaSourceLegacyITCase.java:101)
> Nov 17 16:43:38   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Nov 17 16:43:38   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Nov 17 16:43:38   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Nov 17 16:43:38   at java.lang.reflect.Method.invoke(Method.java:498)
> Nov 17 16:43:38   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Nov 17 16:43:38   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Nov 17 16:43:38   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Nov 17 16:43:38   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Nov 17 16:43:38   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> Nov 17 16:43:38   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> Nov 17 16:43:38   at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> Nov 17 16:43:38   at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> Nov 17 16:43:38   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> Nov 17 16:43:38   at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> Nov 17 16:43:38   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> Nov 17 16:43:38   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> Nov 17 16:43:38   at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> Nov 17 16:43:38   at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> Nov 17 16:43:38   at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> Nov 17 16:43:38   at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>  {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=26672&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=15a22db7-8faa-5b34-3920-d33c9f0ca23c&l=6735



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24821) FlinkKafkaProducerMigrationOperatorTest.testRestoreProducer fails on AZP

2022-02-14 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17491854#comment-17491854
 ] 

Yun Gao commented on FLINK-24821:
-

I'll first close this issue since https://github.com/apache/flink/pull/16108 is 
merged and the test has not been reproduced for a long time. 

> FlinkKafkaProducerMigrationOperatorTest.testRestoreProducer fails on AZP
> 
>
> Key: FLINK-24821
> URL: https://issues.apache.org/jira/browse/FLINK-24821
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.0, 1.15.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.15.0
>
>
> The test {{FlinkKafkaProducerMigrationOperatorTest.testRestoreProducer}} 
> fails on AZP with
> {code}
> Nov 07 23:02:14 [ERROR] testRestoreProducer[Migration Savepoint: 1.8]  Time 
> elapsed: 2.008 s  <<< ERROR!
> Nov 07 23:02:14 java.net.BindException: Address already in use
> Nov 07 23:02:14   at sun.nio.ch.Net.bind0(Native Method)
> Nov 07 23:02:14   at sun.nio.ch.Net.bind(Net.java:461)
> Nov 07 23:02:14   at sun.nio.ch.Net.bind(Net.java:453)
> Nov 07 23:02:14   at 
> sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:222)
> Nov 07 23:02:14   at 
> sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:85)
> Nov 07 23:02:14   at 
> sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:78)
> Nov 07 23:02:14   at 
> org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:90)
> Nov 07 23:02:14   at 
> org.apache.zookeeper.server.ZooKeeperServerMain.runFromConfig(ZooKeeperServerMain.java:120)
> Nov 07 23:02:14   at 
> org.apache.curator.test.TestingZooKeeperMain.runFromConfig(TestingZooKeeperMain.java:93)
> Nov 07 23:02:14   at 
> org.apache.curator.test.TestingZooKeeperServer$1.run(TestingZooKeeperServer.java:148)
> Nov 07 23:02:14   at java.lang.Thread.run(Thread.java:748)
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=26096&view=logs&j=72d4811f-9f0d-5fd0-014a-0bc26b72b642&t=e424005a-b16e-540f-196d-da062cc19bdf&l=7302
> It looks that there is a race condition for avaiable ports.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-24821) FlinkKafkaProducerMigrationOperatorTest.testRestoreProducer fails on AZP

2022-02-14 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao closed FLINK-24821.
---
  Assignee: Arvid Heise
Resolution: Fixed

> FlinkKafkaProducerMigrationOperatorTest.testRestoreProducer fails on AZP
> 
>
> Key: FLINK-24821
> URL: https://issues.apache.org/jira/browse/FLINK-24821
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.0, 1.15.0
>Reporter: Till Rohrmann
>Assignee: Arvid Heise
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.15.0
>
>
> The test {{FlinkKafkaProducerMigrationOperatorTest.testRestoreProducer}} 
> fails on AZP with
> {code}
> Nov 07 23:02:14 [ERROR] testRestoreProducer[Migration Savepoint: 1.8]  Time 
> elapsed: 2.008 s  <<< ERROR!
> Nov 07 23:02:14 java.net.BindException: Address already in use
> Nov 07 23:02:14   at sun.nio.ch.Net.bind0(Native Method)
> Nov 07 23:02:14   at sun.nio.ch.Net.bind(Net.java:461)
> Nov 07 23:02:14   at sun.nio.ch.Net.bind(Net.java:453)
> Nov 07 23:02:14   at 
> sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:222)
> Nov 07 23:02:14   at 
> sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:85)
> Nov 07 23:02:14   at 
> sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:78)
> Nov 07 23:02:14   at 
> org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:90)
> Nov 07 23:02:14   at 
> org.apache.zookeeper.server.ZooKeeperServerMain.runFromConfig(ZooKeeperServerMain.java:120)
> Nov 07 23:02:14   at 
> org.apache.curator.test.TestingZooKeeperMain.runFromConfig(TestingZooKeeperMain.java:93)
> Nov 07 23:02:14   at 
> org.apache.curator.test.TestingZooKeeperServer$1.run(TestingZooKeeperServer.java:148)
> Nov 07 23:02:14   at java.lang.Thread.run(Thread.java:748)
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=26096&view=logs&j=72d4811f-9f0d-5fd0-014a-0bc26b72b642&t=e424005a-b16e-540f-196d-da062cc19bdf&l=7302
> It looks that there is a race condition for avaiable ports.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26115) Multiple Kafka connector tests failed due to The topic metadata failed to propagate to Kafka broker

2022-02-14 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao updated FLINK-26115:

Component/s: Connectors / Kafka

> Multiple Kafka connector tests failed due to The topic metadata failed to 
> propagate to Kafka broker
> ---
>
> Key: FLINK-26115
> URL: https://issues.apache.org/jira/browse/FLINK-26115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.15.0, 1.13.5, 1.14.3
>Reporter: Yun Gao
>Priority: Major
>  Labels: test-stability
>
> This issues tracks all the related issues with "The topic metadata failed to 
> propagate to Kafka broker"



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] dmvk commented on a change in pull request #18689: [FLINK-21439][runtime] Exception history adaptive scheduler

2022-02-14 Thread GitBox


dmvk commented on a change in pull request #18689:
URL: https://github.com/apache/flink/pull/18689#discussion_r805592620



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
##
@@ -306,22 +331,88 @@ void deliverOperatorEventToCoordinator(
 operatorId, request);
 }
 
+/** Transition to different state when failure occurs. Stays in the same 
state by default. */
+abstract void onFailure(Throwable cause);
+
+/**
+ * Transition to different state when the execution graph reaches a 
globally terminal state.
+ *
+ * @param globallyTerminalState globally terminal state which the 
execution graph reached
+ */
+abstract void onGloballyTerminalState(JobStatus globallyTerminalState);
+
+@Override
+public void handleGlobalFailure(Throwable cause) {
+failureCollection.add(new GlobalFailure(cause));
+onFailure(cause);
+}
+
 /**
  * Updates the execution graph with the given task execution state 
transition.
  *
  * @param taskExecutionStateTransition taskExecutionStateTransition to 
update the ExecutionGraph
  * with
  * @return {@code true} if the update was successful; otherwise {@code 
false}
  */
-abstract boolean updateTaskExecutionState(
-TaskExecutionStateTransition taskExecutionStateTransition);
+boolean updateTaskExecutionState(TaskExecutionStateTransition 
taskExecutionStateTransition) {
+if (taskExecutionStateTransition.getExecutionState() != 
ExecutionState.FAILED) {
+return 
getExecutionGraph().updateState(taskExecutionStateTransition);
+}
 
-/**
- * Callback which is called once the execution graph reaches a globally 
terminal state.
- *
- * @param globallyTerminalState globally terminal state which the 
execution graph reached
- */
-abstract void onGloballyTerminalState(JobStatus globallyTerminalState);
+// We need to collect the ExecutionVertexID before updating the state, 
because the Execution
+// is de-registered afterwards.
+// We need to use an optional here, because this method can be called 
even after the
+// Execution is de-registered.
+Optional idOpt =
+
executionGraph.findExecutionVertexId(taskExecutionStateTransition.getID());
+final boolean successfulUpdate =
+getExecutionGraph().updateState(taskExecutionStateTransition);
+if (!successfulUpdate) {
+return false;
+}
+
+Throwable cause = 
extractErrorOrUseDefault(taskExecutionStateTransition);
+
+checkState(idOpt.isPresent());
+ExecutionVertexID id = idOpt.get();
+
+if (getNonEmptyExecution(id).getFailureInfo().isPresent()) {
+failureCollection.add(new LocalFailure(cause, id));
+}
+onFailure(cause);
+return true;

Review comment:
   It seems you've been faster to fix this :) My suggestion would have been 
something along this lines:
   
   ```
   boolean updateTaskExecutionState(TaskExecutionStateTransition 
taskExecutionStateTransition) {
   final Optional maybeExecutionVertexId =
   
executionGraph.findExecutionVertexId(taskExecutionStateTransition.getID());
   final boolean successfulUpdate =
   
getExecutionGraph().updateState(taskExecutionStateTransition);
   if (successfulUpdate) {
   // We're sure that the executionVertexId has been found, because 
we've been able to
   // update the execution graph.
   final ExecutionVertexID executionVertexId =
   
maybeExecutionVertexId.orElseThrow(NoSuchElementException::new);
   final ExecutionState desiredState = 
taskExecutionStateTransition.getExecutionState();
   final ExecutionState newState =
   executionGraph
   .findExecution(executionVertexId)
   .map(Execution::getState)
   .orElseThrow(NoSuchElementException::new);
   // We only want a notification for the actual transition into 
the FAILED state.
   if (desiredState == ExecutionState.FAILED && desiredState == 
newState) {
   final Throwable cause = 
extractErrorOrUseDefault(taskExecutionStateTransition);
   if 
(getNonEmptyExecution(executionVertexId).getFailureInfo().isPresent()) {
   failureCollection.add(new LocalFailure(cause, 
executionVertexId));
   }
   onFailure(cause);
   return true;
   }
   }
   return false;
   }
   ```
   
   Two notes:
   - This de-duplicates the call to the state update
   - More important, this also checks whether we've actually reached the 
desired state, to align with th

[GitHub] [flink] metaswirl commented on a change in pull request #18689: [FLINK-21439][runtime] Exception history adaptive scheduler

2022-02-14 Thread GitBox


metaswirl commented on a change in pull request #18689:
URL: https://github.com/apache/flink/pull/18689#discussion_r805592737



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
##
@@ -306,22 +331,88 @@ void deliverOperatorEventToCoordinator(
 operatorId, request);
 }
 
+/** Transition to different state when failure occurs. Stays in the same 
state by default. */
+abstract void onFailure(Throwable cause);
+
+/**
+ * Transition to different state when the execution graph reaches a 
globally terminal state.
+ *
+ * @param globallyTerminalState globally terminal state which the 
execution graph reached
+ */
+abstract void onGloballyTerminalState(JobStatus globallyTerminalState);
+
+@Override
+public void handleGlobalFailure(Throwable cause) {
+failureCollection.add(new GlobalFailure(cause));
+onFailure(cause);
+}
+
 /**
  * Updates the execution graph with the given task execution state 
transition.
  *
  * @param taskExecutionStateTransition taskExecutionStateTransition to 
update the ExecutionGraph
  * with
  * @return {@code true} if the update was successful; otherwise {@code 
false}
  */
-abstract boolean updateTaskExecutionState(
-TaskExecutionStateTransition taskExecutionStateTransition);
+boolean updateTaskExecutionState(TaskExecutionStateTransition 
taskExecutionStateTransition) {

Review comment:
   This method looks complex, because it handles so many edge cases.
   
   1. L366: `updateState` with state `FAILED` de-registers the `Execution`. So, 
we need to collect the ExecutionVertexID before hand.
   2. L366: `updateTaskExecutionState` can be called multiple times with state 
`FAILED`. After the first call, the `Execution` is already de-registered. So, 
we need to use an optional here for the id. 
   3. L376: The `updateState` call will return false if no Execution is 
present. Hence, in line 376, the id should always be available. Anything else 
would be an unexpected state.
   4. L379: If the state of the `Execution` is `CANCELLING` (and some others) 
before the `updateState` call, then the failure will not be stored on the 
Execution. We currently ignore these failures.
   
   For now, I would propose to keep the code as it is and untangle these edge 
cases afterwards.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-25101) FlinkKafkaProducerITCase.testMigrateFromAtLeastOnceToExactlyOnce failed due to CorrelationIdMismatchException

2022-02-14 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao updated FLINK-25101:

Description: 
{code:java}
Nov 29 17:19:22 [ERROR] Tests run: 15, Failures: 0, Errors: 1, Skipped: 0, Time 
elapsed: 279.756 s <<< FAILURE! - in 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase
Nov 29 17:19:22 [ERROR] 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testMigrateFromAtLeastOnceToExactlyOnce
  Time elapsed: 1.265 s  <<< ERROR!
Nov 29 17:19:22 
org.apache.kafka.common.requests.CorrelationIdMismatchException: Correlation id 
for response (1179651) does not match request (1), request header: 
RequestHeader(apiKey=API_VERSIONS, apiVersion=3, 
clientId=consumer-flink-tests-8, correlationId=1)
Nov 29 17:19:22 at 
org.apache.kafka.common.requests.AbstractResponse.parseResponse(AbstractResponse.java:106)
Nov 29 17:19:22 at 
org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:730)
Nov 29 17:19:22 at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:875)
Nov 29 17:19:22 at 
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:570)
Nov 29 17:19:22 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
Nov 29 17:19:22 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
Nov 29 17:19:22 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
Nov 29 17:19:22 at 
org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:374)
Nov 29 17:19:22 at 
org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1948)
Nov 29 17:19:22 at 
org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1916)
Nov 29 17:19:22 at 
org.apache.flink.connector.kafka.sink.KafkaUtil.getAllPartitions(KafkaUtil.java:170)
Nov 29 17:19:22 at 
org.apache.flink.connector.kafka.sink.KafkaUtil.drainAllRecordsFromTopic(KafkaUtil.java:132)
Nov 29 17:19:22 at 
org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.getAllRecordsFromTopic(KafkaTestEnvironmentImpl.java:280)
Nov 29 17:19:22 at 
org.apache.flink.streaming.connectors.kafka.KafkaTestBase.assertExactlyOnceForTopic(KafkaTestBase.java:317)
Nov 29 17:19:22 at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testMigrateFromAtLeastOnceToExactlyOnce(FlinkKafkaProducerITCase.java:592)
Nov 29 17:19:22 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
Nov 29 17:19:22 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
Nov 29 17:19:22 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
Nov 29 17:19:22 at java.lang.reflect.Method.invoke(Method.java:498)
Nov 29 17:19:22 at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
Nov 29 17:19:22 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
Nov 29 17:19:22 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
Nov 29 17:19:22 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
Nov 29 17:19:22 at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
Nov 29 17:19:22 at 
org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
Nov 29 17:19:22 at 
org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
Nov 29 17:19:22 at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
Nov 29 17:19:22 at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
Nov 29 17:19:22 at 
org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
Nov 29 17:19:22 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
Nov 29 17:19:22 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
Nov 29 17:19:22 at 
org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
Nov 29 17:19:22 at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
Nov 29 17:19:22 at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
Nov 29 17:19:22 at 
org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
Nov 29 17:19:22 at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)

 {code}
[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=27226&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=15a22db7-8faa-5b34-3920-d33c9f0ca23c&l=35495]

[GitHub] [flink] rkhachatryan commented on a change in pull request #18729: [FLINK-26093][tests] Adjust SavepointFormatITCase for ChangelogStateBackend

2022-02-14 Thread GitBox


rkhachatryan commented on a change in pull request #18729:
URL: https://github.com/apache/flink/pull/18729#discussion_r805595444



##
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java
##
@@ -78,138 +82,151 @@
 LoggerAuditingExtension loggerAuditingExtension =
 new LoggerAuditingExtension(SavepointFormatITCase.class, 
Level.INFO);
 
-private static Stream parameters() {
-return Stream.of(
-Arguments.of(
-SavepointFormatType.CANONICAL,
-HEAP,
-(Consumer)
-keyedState ->
-assertThat(
-keyedState,
-
instanceOf(SavepointKeyedStateHandle.class))),
-Arguments.of(
-SavepointFormatType.NATIVE,
-HEAP,
-(Consumer)
-keyedState ->
-assertThat(
-keyedState,
-
instanceOf(KeyGroupsStateHandle.class))),
-Arguments.of(
-SavepointFormatType.CANONICAL,
-ROCKSDB_FULL_SNAPSHOTS,
-(Consumer)
-keyedState ->
-assertThat(
-keyedState,
-
instanceOf(SavepointKeyedStateHandle.class))),
-Arguments.of(
-SavepointFormatType.NATIVE,
-ROCKSDB_FULL_SNAPSHOTS,
-(Consumer)
-keyedState ->
-assertThat(
-keyedState,
-
instanceOf(KeyGroupsStateHandle.class))),
-Arguments.of(
-SavepointFormatType.CANONICAL,
-ROCKSDB_INCREMENTAL_SNAPSHOTS,
-(Consumer)
-keyedState ->
-assertThat(
-keyedState,
-
instanceOf(SavepointKeyedStateHandle.class))),
-Arguments.of(
-SavepointFormatType.NATIVE,
-ROCKSDB_INCREMENTAL_SNAPSHOTS,
-(Consumer)
-keyedState ->
-assertThat(
-keyedState,
-instanceOf(
-
IncrementalRemoteKeyedStateHandle.class;
+private static List parameters() {

Review comment:
   I agree that the assertions are now disconnected from the inputs.
   But looking at it from a different perspective, this allows to understand 
what the end result should be via `validateState`; for example, `CANONICAL` 
savepoints always result in  `SavepointKeyedStateHandle`.
   
   So I'd leave it with the loops.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18472: [FLINK-25783][docs-zh] Translate azure_table_storage.md page into Chinese.

2022-02-14 Thread GitBox


flinkbot edited a comment on pull request #18472:
URL: https://github.com/apache/flink/pull/18472#issuecomment-1020057651


   
   ## CI report:
   
   * 68166baa82ed0f713e304d20b6c300b903ff4bff Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=31370)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-26067) ZooKeeperLeaderElectionConnectionHandlingTest. testLoseLeadershipOnLostConnectionIfTolerateSuspendedConnectionsIsEnabled failed due to timeout

2022-02-14 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17491860#comment-17491860
 ] 

Yun Gao commented on FLINK-26067:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=31195&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=24c3384f-1bcb-57b3-224f-51bf973bbee8&l=7169

> ZooKeeperLeaderElectionConnectionHandlingTest. 
> testLoseLeadershipOnLostConnectionIfTolerateSuspendedConnectionsIsEnabled 
> failed due to timeout
> --
>
> Key: FLINK-26067
> URL: https://issues.apache.org/jira/browse/FLINK-26067
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.15.0
>Reporter: Yun Gao
>Assignee: David Morávek
>Priority: Major
>  Labels: pull-request-available, test-stability
>
> {code:java}
> Feb 09 08:58:56 [ERROR] Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 18.67 s <<< FAILURE! - in 
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionConnectionHandlingTest
> Feb 09 08:58:56 [ERROR] 
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionConnectionHandlingTest.testLoseLeadershipOnLostConnectionIfTolerateSuspendedConnectionsIsEnabled
>   Time elapsed: 8.096 s  <<< ERROR!
> Feb 09 08:58:56 java.util.concurrent.TimeoutException
> Feb 09 08:58:56   at 
> org.apache.flink.core.testutils.OneShotLatch.await(OneShotLatch.java:106)
> Feb 09 08:58:56   at 
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionConnectionHandlingTest$TestingContender.awaitRevokeLeadership(ZooKeeperLeaderElectionConnectionHandlingTest.java:211)
> Feb 09 08:58:56   at 
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionConnectionHandlingTest.lambda$testLoseLeadershipOnLostConnectionIfTolerateSuspendedConnectionsIsEnabled$2(ZooKeeperLeaderElectionConnectionHandlingTest.java:100)
> Feb 09 08:58:56   at 
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionConnectionHandlingTest.runTestWithZooKeeperConnectionProblem(ZooKeeperLeaderElectionConnectionHandlingTest.java:164)
> Feb 09 08:58:56   at 
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionConnectionHandlingTest.runTestWithLostZooKeeperConnection(ZooKeeperLeaderElectionConnectionHandlingTest.java:109)
> Feb 09 08:58:56   at 
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionConnectionHandlingTest.testLoseLeadershipOnLostConnectionIfTolerateSuspendedConnectionsIsEnabled(ZooKeeperLeaderElectionConnectionHandlingTest.java:96)
> Feb 09 08:58:56   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Feb 09 08:58:56   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Feb 09 08:58:56   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Feb 09 08:58:56   at java.lang.reflect.Method.invoke(Method.java:498)
> Feb 09 08:58:56   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Feb 09 08:58:56   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Feb 09 08:58:56   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Feb 09 08:58:56   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Feb 09 08:58:56   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> Feb 09 08:58:56   at 
> org.apache.flink.runtime.util.TestingFatalErrorHandlerResource$CloseableStatement.evaluate(TestingFatalErrorHandlerResource.java:94)
> Feb 09 08:58:56   at 
> org.apache.flink.runtime.util.TestingFatalErrorHandlerResource$CloseableStatement.access$200(TestingFatalErrorHandlerResource.java:86)
> Feb 09 08:58:56   at 
> org.apache.flink.runtime.util.TestingFatalErrorHandlerResource$1.evaluate(TestingFatalErrorHandlerResource.java:58)
> Feb 09 08:58:56   at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> Feb 09 08:58:56   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> Feb 09 08:58:56   at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> Feb 09 08:58:56   at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> Feb 09 08:58:56   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> Feb 09 08:58:56   at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> Feb 09 08:58:56   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> Feb 0

[GitHub] [flink] flinkbot edited a comment on pull request #18726: [FLINK-25983] Add API for configuring maximal watermark drift

2022-02-14 Thread GitBox


flinkbot edited a comment on pull request #18726:
URL: https://github.com/apache/flink/pull/18726#issuecomment-1036101042


   
   ## CI report:
   
   * 3e0350153d25b1ebac5254b968427a9b085787d6 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=31267)
 
   * 06102b172e575b0d2d927e32577fc04d6f9f00bf Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=31380)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18689: [FLINK-21439][runtime] Exception history adaptive scheduler

2022-02-14 Thread GitBox


flinkbot edited a comment on pull request #18689:
URL: https://github.com/apache/flink/pull/18689#issuecomment-1033918208


   
   ## CI report:
   
   * 0666d52c8b1e8a45f53366bbcdcbfc5468c436a7 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=31343)
 
   * 5d1ce4952e98bd88315c899f612d2b1438af9f9a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18741: [FLINK-26101][changelog] Avoid shared state registry to discard multi-registered identical changelog state

2022-02-14 Thread GitBox


flinkbot edited a comment on pull request #18741:
URL: https://github.com/apache/flink/pull/18741#issuecomment-1038577513


   
   ## CI report:
   
   * 6e78646183196da48d2389079f9ad8f16928f121 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=31360)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18718: [FLINK-25782] [docs] Translate datastream filesystem.md page into Chi…

2022-02-14 Thread GitBox


flinkbot edited a comment on pull request #18718:
URL: https://github.com/apache/flink/pull/18718#issuecomment-1035898573


   
   ## CI report:
   
   * d296325814418283a0296448d2e62ac8397603f6 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=31365)
 
   * 0b9401f2635e6fd608583403a496f2bf8d67d990 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=31371)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-26116) FLIP-188: Implement Table Store V0.1

2022-02-14 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-26116:


 Summary: FLIP-188: Implement Table Store V0.1
 Key: FLINK-26116
 URL: https://issues.apache.org/jira/browse/FLINK-26116
 Project: Flink
  Issue Type: New Feature
  Components: Table Store
Reporter: Jingsong Lee
 Fix For: table-store-0.1.0






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25696) Introduce metadataConsumer to InitContext in Sink

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-25696:
-
Parent: FLINK-25152
Issue Type: Sub-task  (was: New Feature)

> Introduce metadataConsumer to InitContext in Sink
> -
>
> Key: FLINK-25696
> URL: https://issues.apache.org/jira/browse/FLINK-25696
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream, Connectors / Kafka, Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> In Table Store, we want to get the offsets of kafka writer, only the offset 
> returned by the callback inside the KafkaWriter is accurate, so we need this 
> callback mechanism.
> This ticket wants to add metadataConsumer to InitContext in Sink:
> {code:java}
> /**
>  * Returns a metadata consumer, the {@link SinkWriter} can publish metadata 
> events of type
>  * {@link MetaT} to the consumer. The consumer can accept metadata events in 
> an asynchronous
>  * thread, and the {@link Consumer#accept} method is executed very fast.
>  */
> default  Optional> metadataConsumer() {
> return Optional.empty();
> }{code}
> SinkWriter can get this consumer, and publish metadata to the consumer 
> implemented by table store sink.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25619) Init flink-table-store repository

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-25619:
-
Parent: (was: FLINK-25152)
Issue Type: New Feature  (was: Sub-task)

> Init flink-table-store repository
> -
>
> Key: FLINK-25619
> URL: https://issues.apache.org/jira/browse/FLINK-25619
> Project: Flink
>  Issue Type: New Feature
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.1.0
>
>
> Create:
>  * README.md
>  * NOTICE LICENSE CODE_OF_CONDUCT
>  * .gitignore
>  * maven tools
>  * releasing tools
>  * github build workflow
>  * pom.xml



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25627) Add basic structures of file store in table-store

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-25627:
-
Parent: FLINK-26116
Issue Type: Sub-task  (was: Bug)

> Add basic structures of file store in table-store
> -
>
> Key: FLINK-25627
> URL: https://issues.apache.org/jira/browse/FLINK-25627
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.1.0
>
>
> Add basic structures of file store in table-store:
>  * Add OffsetRowData
>  * Add KeyValue and KeyValueSerializer
>  * Add MemTable and Accumulator



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25619) Init flink-table-store repository

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-25619:
-
Parent: FLINK-26116
Issue Type: Sub-task  (was: New Feature)

> Init flink-table-store repository
> -
>
> Key: FLINK-25619
> URL: https://issues.apache.org/jira/browse/FLINK-25619
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.1.0
>
>
> Create:
>  * README.md
>  * NOTICE LICENSE CODE_OF_CONDUCT
>  * .gitignore
>  * maven tools
>  * releasing tools
>  * github build workflow
>  * pom.xml



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25625) Introduce FileFormat for table-store

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-25625:
-
Parent: FLINK-26116
Issue Type: Sub-task  (was: Bug)

> Introduce FileFormat for table-store
> 
>
> Key: FLINK-25625
> URL: https://issues.apache.org/jira/browse/FLINK-25625
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.1.0
>
>
> Introduce file format class which creates reader and writer factories for 
> specific file format.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25625) Introduce FileFormat for table-store

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-25625:
-
Parent: (was: FLINK-25152)
Issue Type: Bug  (was: Sub-task)

> Introduce FileFormat for table-store
> 
>
> Key: FLINK-25625
> URL: https://issues.apache.org/jira/browse/FLINK-25625
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.1.0
>
>
> Introduce file format class which creates reader and writer factories for 
> specific file format.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25627) Add basic structures of file store in table-store

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-25627:
-
Parent: (was: FLINK-25152)
Issue Type: Bug  (was: Sub-task)

> Add basic structures of file store in table-store
> -
>
> Key: FLINK-25627
> URL: https://issues.apache.org/jira/browse/FLINK-25627
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.1.0
>
>
> Add basic structures of file store in table-store:
>  * Add OffsetRowData
>  * Add KeyValue and KeyValueSerializer
>  * Add MemTable and Accumulator



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25628) Introduce RecordReader and related classes for table store

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-25628:
-
Parent: (was: FLINK-25152)
Issue Type: Bug  (was: Sub-task)

> Introduce RecordReader and related classes for table store
> --
>
> Key: FLINK-25628
> URL: https://issues.apache.org/jira/browse/FLINK-25628
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.1.0
>
>
> - Introduce RecordReader interface: The reader that reads the batches of 
> records. 
> - Introduce SortMergeReader: This reader is to read a list of `RecordReader`, 
> which is already sorted by key and sequence number, and perform a sort merge 
> algorithm. `KeyValue` with the same key will also be combined during sort 
> merging.
> - Introduce ConcatRecordReader: This reader is to concatenate a list of 
> `RecordReader` and read them sequentially. The input list is already sorted 
> by key and sequence number, and the key intervals do not overlap each other.
> - Introduce FieldStats: Statistics for each field.
> - Introduce SstPathFactory: Factory which produces new Path for sst files.
> - Introduce SstFile and SstFileMeta: This SstFile includes several 
> `KeyValue`, representing the changes inserted into the file storage.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] Myasuka commented on a change in pull request #18741: [FLINK-26101][changelog] Avoid shared state registry to discard multi-registered identical changelog state

2022-02-14 Thread GitBox


Myasuka commented on a change in pull request #18741:
URL: https://github.com/apache/flink/pull/18741#discussion_r805603444



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
##
@@ -237,6 +237,23 @@ public FSDataInputStream openInputStream() throws 
IOException {
 public Optional asBytesIfInMemory() {
 throw new UnsupportedOperationException("Should not call 
here.");
 }
+
+@Override
+public boolean equals(Object o) {
+if (this == o) {
+return true;
+}
+if (o == null || getClass() != o.getClass()) {
+return false;
+}
+StreamStateHandleWrapper that = (StreamStateHandleWrapper) o;
+return Objects.equals(keyedStateHandle, that.keyedStateHandle);

Review comment:
   The `PlaceholderStreamStateHandle` itself is not the materialized keyed 
state handle, but part of it. Actually, I did not understand in what case will 
we have two or more materializations, in the case of restoring?
   
   Or in other words, what test will fail if do not compare the state handle id?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] metaswirl commented on a change in pull request #18689: [FLINK-21439][runtime] Exception history adaptive scheduler

2022-02-14 Thread GitBox


metaswirl commented on a change in pull request #18689:
URL: https://github.com/apache/flink/pull/18689#discussion_r805603365



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
##
@@ -306,22 +331,88 @@ void deliverOperatorEventToCoordinator(
 operatorId, request);
 }
 
+/** Transition to different state when failure occurs. Stays in the same 
state by default. */
+abstract void onFailure(Throwable cause);
+
+/**
+ * Transition to different state when the execution graph reaches a 
globally terminal state.
+ *
+ * @param globallyTerminalState globally terminal state which the 
execution graph reached
+ */
+abstract void onGloballyTerminalState(JobStatus globallyTerminalState);
+
+@Override
+public void handleGlobalFailure(Throwable cause) {
+failureCollection.add(new GlobalFailure(cause));
+onFailure(cause);
+}
+
 /**
  * Updates the execution graph with the given task execution state 
transition.
  *
  * @param taskExecutionStateTransition taskExecutionStateTransition to 
update the ExecutionGraph
  * with
  * @return {@code true} if the update was successful; otherwise {@code 
false}
  */
-abstract boolean updateTaskExecutionState(
-TaskExecutionStateTransition taskExecutionStateTransition);
+boolean updateTaskExecutionState(TaskExecutionStateTransition 
taskExecutionStateTransition) {
+if (taskExecutionStateTransition.getExecutionState() != 
ExecutionState.FAILED) {
+return 
getExecutionGraph().updateState(taskExecutionStateTransition);
+}
 
-/**
- * Callback which is called once the execution graph reaches a globally 
terminal state.
- *
- * @param globallyTerminalState globally terminal state which the 
execution graph reached
- */
-abstract void onGloballyTerminalState(JobStatus globallyTerminalState);
+// We need to collect the ExecutionVertexID before updating the state, 
because the Execution
+// is de-registered afterwards.
+// We need to use an optional here, because this method can be called 
even after the
+// Execution is de-registered.
+Optional idOpt =
+
executionGraph.findExecutionVertexId(taskExecutionStateTransition.getID());
+final boolean successfulUpdate =
+getExecutionGraph().updateState(taskExecutionStateTransition);
+if (!successfulUpdate) {
+return false;
+}
+
+Throwable cause = 
extractErrorOrUseDefault(taskExecutionStateTransition);
+
+checkState(idOpt.isPresent());
+ExecutionVertexID id = idOpt.get();
+
+if (getNonEmptyExecution(id).getFailureInfo().isPresent()) {
+failureCollection.add(new LocalFailure(cause, id));
+}
+onFailure(cause);
+return true;

Review comment:
   Looks better than my version. I will take yours.
   
   Just one thing. The location of the return statements changes the semantic 
of the method slightly from before. Is this on purpose?
   
   Previously `desiredState = "FAILED"` and `newState = "CANCELLING"` would 
return `true`. Now they return `false`. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-25629) Introduce CompactStrategy and CompactManager for table store

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-25629:
-
Parent: (was: FLINK-25152)
Issue Type: Bug  (was: Sub-task)

> Introduce CompactStrategy and CompactManager for table store
> 
>
> Key: FLINK-25629
> URL: https://issues.apache.org/jira/browse/FLINK-25629
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.1.0
>
>
> - Introduce IntervalPartition: Algorithm to partition several sst files into 
> the minimum number of `SortedRun`.
> - Introduce UniversalCompaction CompactStrategy: Universal Compaction Style 
> is a compaction style, targeting the use cases requiring lower write 
> amplification, trading off read amplification and space amplification.
> - Introduce CompactManager: Manager to submit compaction task.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25630) Introduce MergeTree writer and reader for table store

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-25630:
-
Parent: (was: FLINK-25152)
Issue Type: Bug  (was: Sub-task)

> Introduce MergeTree writer and reader for table store
> -
>
> Key: FLINK-25630
> URL: https://issues.apache.org/jira/browse/FLINK-25630
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.1.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25643) Introduce Predicate to table store

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-25643:
-
Parent: (was: FLINK-25152)
Issue Type: Bug  (was: Sub-task)

> Introduce Predicate to table store
> --
>
> Key: FLINK-25643
> URL: https://issues.apache.org/jira/browse/FLINK-25643
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.1.0
>
>
> Flink Expression is not serializable. Although it has asSerializableString, 
> the method is not implemented by all Expressions, and the deserialization 
> requires many parameters.
> So table store introduces Predicate to be a serializable class to do filter 
> and partition push down.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25628) Introduce RecordReader and related classes for table store

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-25628:
-
Parent: FLINK-26116
Issue Type: Sub-task  (was: Bug)

> Introduce RecordReader and related classes for table store
> --
>
> Key: FLINK-25628
> URL: https://issues.apache.org/jira/browse/FLINK-25628
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.1.0
>
>
> - Introduce RecordReader interface: The reader that reads the batches of 
> records. 
> - Introduce SortMergeReader: This reader is to read a list of `RecordReader`, 
> which is already sorted by key and sequence number, and perform a sort merge 
> algorithm. `KeyValue` with the same key will also be combined during sort 
> merging.
> - Introduce ConcatRecordReader: This reader is to concatenate a list of 
> `RecordReader` and read them sequentially. The input list is already sorted 
> by key and sequence number, and the key intervals do not overlap each other.
> - Introduce FieldStats: Statistics for each field.
> - Introduce SstPathFactory: Factory which produces new Path for sst files.
> - Introduce SstFile and SstFileMeta: This SstFile includes several 
> `KeyValue`, representing the changes inserted into the file storage.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25629) Introduce CompactStrategy and CompactManager for table store

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-25629:
-
Parent: FLINK-26116
Issue Type: Sub-task  (was: Bug)

> Introduce CompactStrategy and CompactManager for table store
> 
>
> Key: FLINK-25629
> URL: https://issues.apache.org/jira/browse/FLINK-25629
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.1.0
>
>
> - Introduce IntervalPartition: Algorithm to partition several sst files into 
> the minimum number of `SortedRun`.
> - Introduce UniversalCompaction CompactStrategy: Universal Compaction Style 
> is a compaction style, targeting the use cases requiring lower write 
> amplification, trading off read amplification and space amplification.
> - Introduce CompactManager: Manager to submit compaction task.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25630) Introduce MergeTree writer and reader for table store

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-25630:
-
Parent: FLINK-26116
Issue Type: Sub-task  (was: Bug)

> Introduce MergeTree writer and reader for table store
> -
>
> Key: FLINK-25630
> URL: https://issues.apache.org/jira/browse/FLINK-25630
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.1.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25643) Introduce Predicate to table store

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-25643:
-
Parent: FLINK-26116
Issue Type: Sub-task  (was: Bug)

> Introduce Predicate to table store
> --
>
> Key: FLINK-25643
> URL: https://issues.apache.org/jira/browse/FLINK-25643
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.1.0
>
>
> Flink Expression is not serializable. Although it has asSerializableString, 
> the method is not implemented by all Expressions, and the deserialization 
> requires many parameters.
> So table store introduces Predicate to be a serializable class to do filter 
> and partition push down.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] Myasuka commented on pull request #18224: [FLINK-25143][test] Add ITCase for periodic materialization

2022-02-14 Thread GitBox


Myasuka commented on pull request #18224:
URL: https://github.com/apache/flink/pull/18224#issuecomment-1038798674


   FYI, my local CI to run this PR passed: 
https://myasuka.visualstudio.com/flink/_build/results?buildId=405&view=results


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] dmvk commented on a change in pull request #18689: [FLINK-21439][runtime] Exception history adaptive scheduler

2022-02-14 Thread GitBox


dmvk commented on a change in pull request #18689:
URL: https://github.com/apache/flink/pull/18689#discussion_r805604192



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
##
@@ -306,22 +331,88 @@ void deliverOperatorEventToCoordinator(
 operatorId, request);
 }
 
+/** Transition to different state when failure occurs. Stays in the same 
state by default. */
+abstract void onFailure(Throwable cause);
+
+/**
+ * Transition to different state when the execution graph reaches a 
globally terminal state.
+ *
+ * @param globallyTerminalState globally terminal state which the 
execution graph reached
+ */
+abstract void onGloballyTerminalState(JobStatus globallyTerminalState);
+
+@Override
+public void handleGlobalFailure(Throwable cause) {
+failureCollection.add(new GlobalFailure(cause));
+onFailure(cause);
+}
+
 /**
  * Updates the execution graph with the given task execution state 
transition.
  *
  * @param taskExecutionStateTransition taskExecutionStateTransition to 
update the ExecutionGraph
  * with
  * @return {@code true} if the update was successful; otherwise {@code 
false}
  */
-abstract boolean updateTaskExecutionState(
-TaskExecutionStateTransition taskExecutionStateTransition);
+boolean updateTaskExecutionState(TaskExecutionStateTransition 
taskExecutionStateTransition) {

Review comment:
   What exactly is there left to untangle? It feels that all the corner 
cases have been already addressed in the default scheduler 
(https://github.com/apache/flink/pull/18689#discussion_r805592620 is mostly 
taken over from there, excluding the notifications on the finished state that 
are needed to release partitions for batch jobs).
   
   It doesn't really seem all that complex in context of the scheduler (few 
comments might help though).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-25644) Introduce interfaces between file-table-store and flink connector sink

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-25644:
-
Parent: FLINK-26116
Issue Type: Sub-task  (was: Bug)

> Introduce interfaces between file-table-store and flink connector sink
> --
>
> Key: FLINK-25644
> URL: https://issues.apache.org/jira/browse/FLINK-25644
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.1.0
>
>
> We should introduce a FileStore interface to provide operations:
>  * provides FileStoreWrite: Write operation which provides RecordWriter 
> creation.
>  * provides FileStoreCommit: Commit operation which provides commit and 
> overwrite.
>  * provides FileStoreExpire: Expire operation which provides snapshots expire.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25644) Introduce interfaces between file-table-store and flink connector sink

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-25644:
-
Parent: (was: FLINK-25152)
Issue Type: Bug  (was: Sub-task)

> Introduce interfaces between file-table-store and flink connector sink
> --
>
> Key: FLINK-25644
> URL: https://issues.apache.org/jira/browse/FLINK-25644
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.1.0
>
>
> We should introduce a FileStore interface to provide operations:
>  * provides FileStoreWrite: Write operation which provides RecordWriter 
> creation.
>  * provides FileStoreCommit: Commit operation which provides commit and 
> overwrite.
>  * provides FileStoreExpire: Expire operation which provides snapshots expire.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25680) Introduce Table Store Flink Sink

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-25680:
-
Parent: FLINK-26116
Issue Type: Sub-task  (was: Bug)

> Introduce Table Store Flink Sink
> 
>
> Key: FLINK-25680
> URL: https://issues.apache.org/jira/browse/FLINK-25680
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.1.0
>
>
> Introduce Table Store Sink with SinkWriter and GlobalCommitter.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25680) Introduce Table Store Flink Sink

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-25680:
-
Parent: (was: FLINK-25152)
Issue Type: Bug  (was: Sub-task)

> Introduce Table Store Flink Sink
> 
>
> Key: FLINK-25680
> URL: https://issues.apache.org/jira/browse/FLINK-25680
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.1.0
>
>
> Introduce Table Store Sink with SinkWriter and GlobalCommitter.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] metaswirl commented on a change in pull request #18689: [FLINK-21439][runtime] Exception history adaptive scheduler

2022-02-14 Thread GitBox


metaswirl commented on a change in pull request #18689:
URL: https://github.com/apache/flink/pull/18689#discussion_r805604493



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
##
@@ -306,22 +331,88 @@ void deliverOperatorEventToCoordinator(
 operatorId, request);
 }
 
+/** Transition to different state when failure occurs. Stays in the same 
state by default. */
+abstract void onFailure(Throwable cause);
+
+/**
+ * Transition to different state when the execution graph reaches a 
globally terminal state.
+ *
+ * @param globallyTerminalState globally terminal state which the 
execution graph reached
+ */
+abstract void onGloballyTerminalState(JobStatus globallyTerminalState);
+
+@Override
+public void handleGlobalFailure(Throwable cause) {
+failureCollection.add(new GlobalFailure(cause));
+onFailure(cause);
+}
+
 /**
  * Updates the execution graph with the given task execution state 
transition.
  *
  * @param taskExecutionStateTransition taskExecutionStateTransition to 
update the ExecutionGraph
  * with
  * @return {@code true} if the update was successful; otherwise {@code 
false}
  */
-abstract boolean updateTaskExecutionState(
-TaskExecutionStateTransition taskExecutionStateTransition);
+boolean updateTaskExecutionState(TaskExecutionStateTransition 
taskExecutionStateTransition) {
+if (taskExecutionStateTransition.getExecutionState() != 
ExecutionState.FAILED) {
+return 
getExecutionGraph().updateState(taskExecutionStateTransition);
+}
 
-/**
- * Callback which is called once the execution graph reaches a globally 
terminal state.
- *
- * @param globallyTerminalState globally terminal state which the 
execution graph reached
- */
-abstract void onGloballyTerminalState(JobStatus globallyTerminalState);
+// We need to collect the ExecutionVertexID before updating the state, 
because the Execution
+// is de-registered afterwards.
+// We need to use an optional here, because this method can be called 
even after the
+// Execution is de-registered.
+Optional idOpt =
+
executionGraph.findExecutionVertexId(taskExecutionStateTransition.getID());
+final boolean successfulUpdate =
+getExecutionGraph().updateState(taskExecutionStateTransition);
+if (!successfulUpdate) {
+return false;
+}
+
+Throwable cause = 
extractErrorOrUseDefault(taskExecutionStateTransition);
+
+checkState(idOpt.isPresent());
+ExecutionVertexID id = idOpt.get();
+
+if (getNonEmptyExecution(id).getFailureInfo().isPresent()) {
+failureCollection.add(new LocalFailure(cause, id));
+}
+onFailure(cause);
+return true;

Review comment:
   How about, we use this:
   
   ```java
boolean updateTaskExecutionState(TaskExecutionStateTransition 
taskExecutionStateTransition) {
   final Optional maybeExecutionVertexId =
   
executionGraph.findExecutionVertexId(taskExecutionStateTransition.getID());
   if (!getExecutionGraph().updateState(taskExecutionStateTransition)) {
 return false;
   }
   
   // We're sure that the executionVertexId has been found, because 
we've been able to
   // update the execution graph.
   final ExecutionVertexID executionVertexId =
   
maybeExecutionVertexId.orElseThrow(NoSuchElementException::new);
   final ExecutionState desiredState = 
taskExecutionStateTransition.getExecutionState();
   final ExecutionState newState =
   executionGraph
   .findExecution(executionVertexId)
   .map(Execution::getState)
   .orElseThrow(NoSuchElementException::new);
   // We only want a notification for the actual transition into the 
FAILED state.
   if (desiredState == ExecutionState.FAILED && desiredState == 
newState) {
   final Throwable cause = 
extractErrorOrUseDefault(taskExecutionStateTransition);
   if 
(getNonEmptyExecution(executionVertexId).getFailureInfo().isPresent()) {
   failureCollection.add(new LocalFailure(cause, 
executionVertexId));
   }
   onFailure(cause);
   }
   return true;
   }
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] Myasuka edited a comment on pull request #18224: [FLINK-25143][test] Add ITCase for periodic materialization

2022-02-14 Thread GitBox


Myasuka edited a comment on pull request #18224:
URL: https://github.com/apache/flink/pull/18224#issuecomment-1038798674


   FYI, my local CI to run this PR passed: 
https://myasuka.visualstudio.com/flink/_build/results?buildId=405&view=results
   
   @rkhachatryan , I think you could take another round to review this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-25803) Implement partition and bucket filter of FileStoreScanImpl

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-25803:
-
Parent: (was: FLINK-25152)
Issue Type: Bug  (was: Sub-task)

> Implement partition and bucket filter of FileStoreScanImpl
> --
>
> Key: FLINK-25803
> URL: https://issues.apache.org/jira/browse/FLINK-25803
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Reporter: Caizhi Weng
>Assignee: Caizhi Weng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Implement partition and bucket filter in FileStoreScanImpl.
> Also perform some small optimizations such as concurrent reads in 
> FileStoreScanImpl.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25689) Introduce atomic commit

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-25689:
-
Parent: FLINK-26116
Issue Type: Sub-task  (was: Bug)

> Introduce atomic commit
> ---
>
> Key: FLINK-25689
> URL: https://issues.apache.org/jira/browse/FLINK-25689
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Reporter: Caizhi Weng
>Assignee: Caizhi Weng
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.1.0
>
>
> This ticket introduces an atomic commit transaction for file store.
>  # Before calling {{{}FileStoreCommitImpl#commit{}}}, user should first call 
> {{FileStoreCommitImpl#filterCommitted}} to make sure this commit is not done 
> before.
>  # Before committing, it will first check for conflicts by checking if all 
> files to be removed currently exists.
>  # After that it use the external {{FileStoreCommitImpl#lock}} (if provided) 
> or the atomic rename of the file system to ensure atomicity.
>  # If commit fails due to conflicts or exception it tries its best to clean 
> up and aborts.
>  # If atomic rename fails it tries again after reading the latest snapshot 
> from step 2.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25689) Introduce atomic commit

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-25689:
-
Parent: (was: FLINK-25152)
Issue Type: Bug  (was: Sub-task)

> Introduce atomic commit
> ---
>
> Key: FLINK-25689
> URL: https://issues.apache.org/jira/browse/FLINK-25689
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Reporter: Caizhi Weng
>Assignee: Caizhi Weng
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.1.0
>
>
> This ticket introduces an atomic commit transaction for file store.
>  # Before calling {{{}FileStoreCommitImpl#commit{}}}, user should first call 
> {{FileStoreCommitImpl#filterCommitted}} to make sure this commit is not done 
> before.
>  # Before committing, it will first check for conflicts by checking if all 
> files to be removed currently exists.
>  # After that it use the external {{FileStoreCommitImpl#lock}} (if provided) 
> or the atomic rename of the file system to ensure atomicity.
>  # If commit fails due to conflicts or exception it tries its best to clean 
> up and aborts.
>  # If atomic rename fails it tries again after reading the latest snapshot 
> from step 2.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25820) Introduce Table Store Flink Source

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-25820:
-
Parent: FLINK-26116
Issue Type: Sub-task  (was: Bug)

> Introduce Table Store Flink Source
> --
>
> Key: FLINK-25820
> URL: https://issues.apache.org/jira/browse/FLINK-25820
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.1.0
>
>
> Introduce FLIP-27 source implementation for table file store.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25820) Introduce Table Store Flink Source

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-25820:
-
Parent: (was: FLINK-25152)
Issue Type: Bug  (was: Sub-task)

> Introduce Table Store Flink Source
> --
>
> Key: FLINK-25820
> URL: https://issues.apache.org/jira/browse/FLINK-25820
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.1.0
>
>
> Introduce FLIP-27 source implementation for table file store.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25803) Implement partition and bucket filter of FileStoreScanImpl

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-25803:
-
Parent: FLINK-26116
Issue Type: Sub-task  (was: Bug)

> Implement partition and bucket filter of FileStoreScanImpl
> --
>
> Key: FLINK-25803
> URL: https://issues.apache.org/jira/browse/FLINK-25803
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Reporter: Caizhi Weng
>Assignee: Caizhi Weng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Implement partition and bucket filter in FileStoreScanImpl.
> Also perform some small optimizations such as concurrent reads in 
> FileStoreScanImpl.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25876) Implement overwrite in FileStoreScanImpl

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-25876:
-
Parent: (was: FLINK-25152)
Issue Type: Bug  (was: Sub-task)

> Implement overwrite in FileStoreScanImpl
> 
>
> Key: FLINK-25876
> URL: https://issues.apache.org/jira/browse/FLINK-25876
> Project: Flink
>  Issue Type: Bug
>Reporter: Caizhi Weng
>Assignee: Caizhi Weng
>Priority: Major
>  Labels: pull-request-available
>
> Overwrite is a useful transaction for batch jobs to completely update a 
> partition for data correction. Currently FileStoreScanImpl doesn't implement 
> this transaction so we need to implement that.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25876) Implement overwrite in FileStoreScanImpl

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-25876:
-
Parent: FLINK-26116
Issue Type: Sub-task  (was: Bug)

> Implement overwrite in FileStoreScanImpl
> 
>
> Key: FLINK-25876
> URL: https://issues.apache.org/jira/browse/FLINK-25876
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Caizhi Weng
>Assignee: Caizhi Weng
>Priority: Major
>  Labels: pull-request-available
>
> Overwrite is a useful transaction for batch jobs to completely update a 
> partition for data correction. Currently FileStoreScanImpl doesn't implement 
> this transaction so we need to implement that.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25994) Implement FileStoreExpire

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-25994:
-
Parent: FLINK-26116
Issue Type: Sub-task  (was: Bug)

> Implement FileStoreExpire
> -
>
> Key: FLINK-25994
> URL: https://issues.apache.org/jira/browse/FLINK-25994
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Reporter: Caizhi Weng
>Assignee: Caizhi Weng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Currently FileStoreExpire does not have an implementation. We need an 
> implementation to clean up old snapshots and related files.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26031) Support projection pushdown on keys and values in sst file readers

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-26031:
-
Parent: (was: FLINK-25152)
Issue Type: Bug  (was: Sub-task)

> Support projection pushdown on keys and values in sst file readers
> --
>
> Key: FLINK-26031
> URL: https://issues.apache.org/jira/browse/FLINK-26031
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Reporter: Caizhi Weng
>Assignee: Caizhi Weng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Projection pushdown is an optimization for sources. With this optimization, 
> we can avoid reading useless columns and thus improve performance.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26031) Support projection pushdown on keys and values in sst file readers

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-26031:
-
Parent: FLINK-26116
Issue Type: Sub-task  (was: Bug)

> Support projection pushdown on keys and values in sst file readers
> --
>
> Key: FLINK-26031
> URL: https://issues.apache.org/jira/browse/FLINK-26031
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Reporter: Caizhi Weng
>Assignee: Caizhi Weng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Projection pushdown is an optimization for sources. With this optimization, 
> we can avoid reading useless columns and thus improve performance.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25994) Implement FileStoreExpire

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-25994:
-
Parent: (was: FLINK-25152)
Issue Type: Bug  (was: Sub-task)

> Implement FileStoreExpire
> -
>
> Key: FLINK-25994
> URL: https://issues.apache.org/jira/browse/FLINK-25994
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Reporter: Caizhi Weng
>Assignee: Caizhi Weng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Currently FileStoreExpire does not have an implementation. We need an 
> implementation to clean up old snapshots and related files.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26103) Introduce log store

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-26103:
-
Parent: (was: FLINK-25152)
Issue Type: Bug  (was: Sub-task)

> Introduce log store
> ---
>
> Key: FLINK-26103
> URL: https://issues.apache.org/jira/browse/FLINK-26103
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.1.0
>
>
> Introduce log store:
>  * Introduce log store interfaces
>  * Implement Kafka log store



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26103) Introduce log store

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-26103:
-
Parent: FLINK-26116
Issue Type: Sub-task  (was: Bug)

> Introduce log store
> ---
>
> Key: FLINK-26103
> URL: https://issues.apache.org/jira/browse/FLINK-26103
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.1.0
>
>
> Introduce log store:
>  * Introduce log store interfaces
>  * Implement Kafka log store



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26066) Introduce FileStoreRead

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-26066:
-
Parent: (was: FLINK-25152)
Issue Type: Bug  (was: Sub-task)

> Introduce FileStoreRead
> ---
>
> Key: FLINK-26066
> URL: https://issues.apache.org/jira/browse/FLINK-26066
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Reporter: Caizhi Weng
>Assignee: Caizhi Weng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Apart from {{FileStoreWrite}}, we also need a {{FileStoreRead}} operation to 
> read actual key-values for a specific partition and bucket.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25771) CassandraConnectorITCase.testRetrialAndDropTables timeouts on AZP

2022-02-14 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17491864#comment-17491864
 ] 

Yun Gao commented on FLINK-25771:
-

Hi~ sorry the issue seems reproduced after the fix is merged : 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=31223&view=logs&j=d44f43ce-542c-597d-bf94-b0718c71e5e8&t=ed165f3f-d0f6-524b-5279-86f8ee7d0e2d&l=12625


> CassandraConnectorITCase.testRetrialAndDropTables timeouts on AZP
> -
>
> Key: FLINK-25771
> URL: https://issues.apache.org/jira/browse/FLINK-25771
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Cassandra
>Affects Versions: 1.15.0, 1.13.5, 1.14.3
>Reporter: Till Rohrmann
>Assignee: Etienne Chauchot
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.15.0, 1.13.6, 1.14.4
>
>
> The test {{CassandraConnectorITCase.testRetrialAndDropTables}} fails on AZP 
> with
> {code}
> Jan 23 01:02:52 com.datastax.driver.core.exceptions.NoHostAvailableException: 
> All host(s) tried for query failed (tried: /172.17.0.1:59220 
> (com.datastax.driver.core.exceptions.OperationTimedOutException: 
> [/172.17.0.1] Timed out waiting for server response))
> Jan 23 01:02:52   at 
> com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84)
> Jan 23 01:02:52   at 
> com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:37)
> Jan 23 01:02:52   at 
> com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
> Jan 23 01:02:52   at 
> com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245)
> Jan 23 01:02:52   at 
> com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:63)
> Jan 23 01:02:52   at 
> com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:39)
> Jan 23 01:02:52   at 
> org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase.testRetrialAndDropTables(CassandraConnectorITCase.java:554)
> Jan 23 01:02:52   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Jan 23 01:02:52   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Jan 23 01:02:52   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Jan 23 01:02:52   at java.lang.reflect.Method.invoke(Method.java:498)
> Jan 23 01:02:52   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> Jan 23 01:02:52   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Jan 23 01:02:52   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> Jan 23 01:02:52   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Jan 23 01:02:52   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> Jan 23 01:02:52   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> Jan 23 01:02:52   at 
> org.apache.flink.testutils.junit.RetryRule$RetryOnExceptionStatement.evaluate(RetryRule.java:196)
> Jan 23 01:02:52   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> Jan 23 01:02:52   at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> Jan 23 01:02:52   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> Jan 23 01:02:52   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> Jan 23 01:02:52   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> Jan 23 01:02:52   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> Jan 23 01:02:52   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> Jan 23 01:02:52   at 
> org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:30)
> Jan 23 01:02:52   at org.junit.rules.RunRules.eva

[jira] [Updated] (FLINK-26066) Introduce FileStoreRead

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-26066:
-
Parent: FLINK-26116
Issue Type: Sub-task  (was: Bug)

> Introduce FileStoreRead
> ---
>
> Key: FLINK-26066
> URL: https://issues.apache.org/jira/browse/FLINK-26066
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Reporter: Caizhi Weng
>Assignee: Caizhi Weng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Apart from {{FileStoreWrite}}, we also need a {{FileStoreRead}} operation to 
> read actual key-values for a specific partition and bucket.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] dmvk commented on a change in pull request #18689: [FLINK-21439][runtime] Exception history adaptive scheduler

2022-02-14 Thread GitBox


dmvk commented on a change in pull request #18689:
URL: https://github.com/apache/flink/pull/18689#discussion_r805607024



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
##
@@ -306,22 +331,88 @@ void deliverOperatorEventToCoordinator(
 operatorId, request);
 }
 
+/** Transition to different state when failure occurs. Stays in the same 
state by default. */
+abstract void onFailure(Throwable cause);
+
+/**
+ * Transition to different state when the execution graph reaches a 
globally terminal state.
+ *
+ * @param globallyTerminalState globally terminal state which the 
execution graph reached
+ */
+abstract void onGloballyTerminalState(JobStatus globallyTerminalState);
+
+@Override
+public void handleGlobalFailure(Throwable cause) {
+failureCollection.add(new GlobalFailure(cause));
+onFailure(cause);
+}
+
 /**
  * Updates the execution graph with the given task execution state 
transition.
  *
  * @param taskExecutionStateTransition taskExecutionStateTransition to 
update the ExecutionGraph
  * with
  * @return {@code true} if the update was successful; otherwise {@code 
false}
  */
-abstract boolean updateTaskExecutionState(
-TaskExecutionStateTransition taskExecutionStateTransition);
+boolean updateTaskExecutionState(TaskExecutionStateTransition 
taskExecutionStateTransition) {
+if (taskExecutionStateTransition.getExecutionState() != 
ExecutionState.FAILED) {
+return 
getExecutionGraph().updateState(taskExecutionStateTransition);
+}
 
-/**
- * Callback which is called once the execution graph reaches a globally 
terminal state.
- *
- * @param globallyTerminalState globally terminal state which the 
execution graph reached
- */
-abstract void onGloballyTerminalState(JobStatus globallyTerminalState);
+// We need to collect the ExecutionVertexID before updating the state, 
because the Execution
+// is de-registered afterwards.
+// We need to use an optional here, because this method can be called 
even after the
+// Execution is de-registered.
+Optional idOpt =
+
executionGraph.findExecutionVertexId(taskExecutionStateTransition.getID());
+final boolean successfulUpdate =
+getExecutionGraph().updateState(taskExecutionStateTransition);
+if (!successfulUpdate) {
+return false;
+}
+
+Throwable cause = 
extractErrorOrUseDefault(taskExecutionStateTransition);
+
+checkState(idOpt.isPresent());
+ExecutionVertexID id = idOpt.get();
+
+if (getNonEmptyExecution(id).getFailureInfo().isPresent()) {
+failureCollection.add(new LocalFailure(cause, id));
+}
+onFailure(cause);
+return true;

Review comment:
   > Previously desiredState = "FAILED" and newState = "CANCELLING" would 
return true. Now they return false.
   
   That's intended. As a side-effect it basically solves the failing cancel 
problem we've discussed previously.
   
   > How about, we use this:
   
   TBH I find these early returns super hard to read. Usually I'd try to avoid 
them unless there the nesting would be too deep / we have a really long method 
that can't be easily broken into smaller fragment.s




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Reopened] (FLINK-25771) CassandraConnectorITCase.testRetrialAndDropTables timeouts on AZP

2022-02-14 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao reopened FLINK-25771:
-

> CassandraConnectorITCase.testRetrialAndDropTables timeouts on AZP
> -
>
> Key: FLINK-25771
> URL: https://issues.apache.org/jira/browse/FLINK-25771
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Cassandra
>Affects Versions: 1.15.0, 1.13.5, 1.14.3
>Reporter: Till Rohrmann
>Assignee: Etienne Chauchot
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.15.0, 1.13.6, 1.14.4
>
>
> The test {{CassandraConnectorITCase.testRetrialAndDropTables}} fails on AZP 
> with
> {code}
> Jan 23 01:02:52 com.datastax.driver.core.exceptions.NoHostAvailableException: 
> All host(s) tried for query failed (tried: /172.17.0.1:59220 
> (com.datastax.driver.core.exceptions.OperationTimedOutException: 
> [/172.17.0.1] Timed out waiting for server response))
> Jan 23 01:02:52   at 
> com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84)
> Jan 23 01:02:52   at 
> com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:37)
> Jan 23 01:02:52   at 
> com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
> Jan 23 01:02:52   at 
> com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245)
> Jan 23 01:02:52   at 
> com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:63)
> Jan 23 01:02:52   at 
> com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:39)
> Jan 23 01:02:52   at 
> org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase.testRetrialAndDropTables(CassandraConnectorITCase.java:554)
> Jan 23 01:02:52   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Jan 23 01:02:52   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Jan 23 01:02:52   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Jan 23 01:02:52   at java.lang.reflect.Method.invoke(Method.java:498)
> Jan 23 01:02:52   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> Jan 23 01:02:52   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Jan 23 01:02:52   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> Jan 23 01:02:52   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Jan 23 01:02:52   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> Jan 23 01:02:52   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> Jan 23 01:02:52   at 
> org.apache.flink.testutils.junit.RetryRule$RetryOnExceptionStatement.evaluate(RetryRule.java:196)
> Jan 23 01:02:52   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> Jan 23 01:02:52   at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> Jan 23 01:02:52   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> Jan 23 01:02:52   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> Jan 23 01:02:52   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> Jan 23 01:02:52   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> Jan 23 01:02:52   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> Jan 23 01:02:52   at 
> org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:30)
> Jan 23 01:02:52   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> Jan 23 01:02:52   at org.junit.runners.Suite.runChild(Suite.java:128)
> Jan 23 01:02:52   at org.junit.runners.Suite.runChild(Suite.java:27)
> Jan 23 01:02:52   at 
> o

[GitHub] [flink] infoverload opened a new pull request #18746: Dt/fix security pages

2022-02-14 Thread GitBox


infoverload opened a new pull request #18746:
URL: https://github.com/apache/flink/pull/18746


   
   
   ## What is the purpose of the change
   
   Fix security section of the docs
   
   
   ## Brief change log
   
   - reorganization of flow
   - add links and more text
   - lots of restructuring and renaming things
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-1493) Support for streaming jobs preserving global ordering of records

2022-02-14 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-1493.
-
Resolution: Won't Do

> Support for streaming jobs preserving global ordering of records
> 
>
> Key: FLINK-1493
> URL: https://issues.apache.org/jira/browse/FLINK-1493
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Reporter: Márton Balassi
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> Distributed streaming jobs do not give total, global ordering guarantees for 
> records only partial ordering is provided by the system: records travelling 
> on the same exact route of the physical plan are ordered, but they aren't 
> between routes.
> It turns out that although this feature can only be implemented via "merge 
> sorting" in the input buffers on a timestamp field thus creating substantial 
> latency is still desired for a number of applications.
> Just a heads up for the implementation: the sorting introduces back pressure 
> in the buffers and might cause deadlocks.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24744) FlinkKafkaProducerITCase.testMigrateFromAtLeastOnceToExactlyOnce fails on Azure because org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server do

2022-02-14 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24744?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17491868#comment-17491868
 ] 

Yun Gao commented on FLINK-24744:
-

Another case for testRecoveryWithExactlyOnceGuaranteeAndConcurrentCheckpoints: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=31229&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=15a22db7-8faa-5b34-3920-d33c9f0ca23c&l=36064

> FlinkKafkaProducerITCase.testMigrateFromAtLeastOnceToExactlyOnce fails on 
> Azure because 
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition
> ---
>
> Key: FLINK-24744
> URL: https://issues.apache.org/jira/browse/FLINK-24744
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.15.0
>Reporter: Till Rohrmann
>Assignee: Fabian Paul
>Priority: Critical
>  Labels: stale-assigned, test-stability
> Fix For: 1.15.0
>
>
> {{FlinkKafkaProducerITCase.testMigrateFromAtLeastOnceToExactlyOnce}} fails on 
> Azure because 
> {{org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition}}.
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=25826&view=logs&j=ce8f3cc3-c1ea-5281-f5eb-df9ebd24947f&t=f266c805-9429-58ed-2f9e-482e7b82f58b&l=6670



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot commented on pull request #18746: [docs]revamp security pages

2022-02-14 Thread GitBox


flinkbot commented on pull request #18746:
URL: https://github.com/apache/flink/pull/18746#issuecomment-1038807139


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit d46bd54e8d4e05e791d1679c5305100ec614c4c1 (Mon Feb 14 
08:43:14 UTC 2022)
   
   **Warnings:**
* Documentation files were touched, but no `docs/content.zh/` files: Update 
Chinese documentation or file Jira ticket.
* **Invalid pull request title: No valid Jira ID provided**
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-25152) FLIP-188: Introduce Built-in Dynamic Table Storage

2022-02-14 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-25152:
-
Fix Version/s: (was: table-store-0.1.0)

> FLIP-188: Introduce Built-in Dynamic Table Storage
> --
>
> Key: FLINK-25152
> URL: https://issues.apache.org/jira/browse/FLINK-25152
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API, Table SQL / Ecosystem, Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
> Fix For: 1.15.0
>
>
> introduce built-in storage support for dynamic table, a truly unified 
> changelog & table representation, from Flink SQL’s perspective. The storage 
> will improve the usability a lot.
> More detail see: 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] rkhachatryan commented on a change in pull request #18729: [FLINK-26093][tests] Adjust SavepointFormatITCase for ChangelogStateBackend

2022-02-14 Thread GitBox


rkhachatryan commented on a change in pull request #18729:
URL: https://github.com/apache/flink/pull/18729#discussion_r805613253



##
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java
##
@@ -78,138 +82,151 @@
 LoggerAuditingExtension loggerAuditingExtension =
 new LoggerAuditingExtension(SavepointFormatITCase.class, 
Level.INFO);
 
-private static Stream parameters() {
-return Stream.of(
-Arguments.of(
-SavepointFormatType.CANONICAL,
-HEAP,
-(Consumer)
-keyedState ->
-assertThat(
-keyedState,
-
instanceOf(SavepointKeyedStateHandle.class))),
-Arguments.of(
-SavepointFormatType.NATIVE,
-HEAP,
-(Consumer)
-keyedState ->
-assertThat(
-keyedState,
-
instanceOf(KeyGroupsStateHandle.class))),
-Arguments.of(
-SavepointFormatType.CANONICAL,
-ROCKSDB_FULL_SNAPSHOTS,
-(Consumer)
-keyedState ->
-assertThat(
-keyedState,
-
instanceOf(SavepointKeyedStateHandle.class))),
-Arguments.of(
-SavepointFormatType.NATIVE,
-ROCKSDB_FULL_SNAPSHOTS,
-(Consumer)
-keyedState ->
-assertThat(
-keyedState,
-
instanceOf(KeyGroupsStateHandle.class))),
-Arguments.of(
-SavepointFormatType.CANONICAL,
-ROCKSDB_INCREMENTAL_SNAPSHOTS,
-(Consumer)
-keyedState ->
-assertThat(
-keyedState,
-
instanceOf(SavepointKeyedStateHandle.class))),
-Arguments.of(
-SavepointFormatType.NATIVE,
-ROCKSDB_INCREMENTAL_SNAPSHOTS,
-(Consumer)
-keyedState ->
-assertThat(
-keyedState,
-instanceOf(
-
IncrementalRemoteKeyedStateHandle.class;
+private static List parameters() {
+// iterate through all combinations of backends, isIncremental, 
isChangelogEnabled
+List result = new LinkedList<>();
+for (BiFunction builder :
+StateBackendConfig.builders) {
+for (boolean incremental : new boolean[] {true, false}) {
+for (boolean changelog : new boolean[] {true, false}) {
+for (SavepointFormatType formatType : 
SavepointFormatType.values()) {
+result.add(Arguments.of(formatType, 
builder.apply(incremental, changelog)));
+}
+}
+}
+}
+return result;
+}
+
+private void validateState(
+KeyedStateHandle state,
+SavepointFormatType formatType,
+StateBackendConfig backendConfig) {
+if (formatType == SavepointFormatType.CANONICAL) {
+assertThat(state, instanceOf(SavepointKeyedStateHandle.class));
+} else if (backendConfig.isChangelogEnabled()) {
+assertThat(state, instanceOf(ChangelogStateBackendHandle.class));
+for (KeyedStateHandle nestedState :
+((ChangelogStateBackendHandle) 
state).getMaterializedStateHandles()) {
+validateNativeNonChangelogState(nestedState, backendConfig);
+}
+} else {
+validateNativeNonChangelogState(state, backendConfig);
+}
+}
+
+private void validateNativeNonChangelogState(
+KeyedStateHandle state, StateBackendConfig backendConfig) {
+if (backendConfig.isIncremental()) {
+assertThat(state, 
instanceOf(IncrementalRemoteKeyedStateHandle.class));
+} else {
+assertThat(state, instanceOf(KeyGroupsStateHandle.class));
+}
 }
 
 private abstract static class StateBackendConfig {
+   

[jira] [Commented] (FLINK-25456) FlinkKafkaProducerITCase.testScaleDownBeforeFirstCheckpoint

2022-02-14 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17491870#comment-17491870
 ] 

Yun Gao commented on FLINK-25456:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=31256&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=15a22db7-8faa-5b34-3920-d33c9f0ca23c&l=36113

> FlinkKafkaProducerITCase.testScaleDownBeforeFirstCheckpoint
> ---
>
> Key: FLINK-25456
> URL: https://issues.apache.org/jira/browse/FLINK-25456
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.2
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> The test {{FlinkKafkaProducerITCase.testScaleDownBeforeFirstCheckpoint}} 
> fails with
> {code}
> 2021-12-27T02:54:54.8464375Z Dec 27 02:54:54 [ERROR] Tests run: 15, Failures: 
> 1, Errors: 0, Skipped: 0, Time elapsed: 285.279 s <<< FAILURE! - in 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase
> 2021-12-27T02:54:54.8465354Z Dec 27 02:54:54 [ERROR] 
> testScaleDownBeforeFirstCheckpoint  Time elapsed: 85.514 s  <<< FAILURE!
> 2021-12-27T02:54:54.8468827Z Dec 27 02:54:54 java.lang.AssertionError: 
> Detected producer leak. Thread name: kafka-producer-network-thread | 
> producer-MockTask-002a002c-18
> 2021-12-27T02:54:54.8469779Z Dec 27 02:54:54  at 
> org.junit.Assert.fail(Assert.java:89)
> 2021-12-27T02:54:54.8470485Z Dec 27 02:54:54  at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.checkProducerLeak(FlinkKafkaProducerITCase.java:847)
> 2021-12-27T02:54:54.8471842Z Dec 27 02:54:54  at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testScaleDownBeforeFirstCheckpoint(FlinkKafkaProducerITCase.java:381)
> 2021-12-27T02:54:54.8472724Z Dec 27 02:54:54  at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2021-12-27T02:54:54.8473509Z Dec 27 02:54:54  at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2021-12-27T02:54:54.8474704Z Dec 27 02:54:54  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2021-12-27T02:54:54.8475523Z Dec 27 02:54:54  at 
> java.base/java.lang.reflect.Method.invoke(Method.java:566)
> 2021-12-27T02:54:54.8476258Z Dec 27 02:54:54  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 2021-12-27T02:54:54.8476949Z Dec 27 02:54:54  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2021-12-27T02:54:54.8477632Z Dec 27 02:54:54  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 2021-12-27T02:54:54.8478451Z Dec 27 02:54:54  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2021-12-27T02:54:54.8479282Z Dec 27 02:54:54  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2021-12-27T02:54:54.8479976Z Dec 27 02:54:54  at 
> org.apache.flink.testutils.junit.RetryRule$RetryOnFailureStatement.evaluate(RetryRule.java:135)
> 2021-12-27T02:54:54.8480696Z Dec 27 02:54:54  at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> 2021-12-27T02:54:54.8481410Z Dec 27 02:54:54  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> 2021-12-27T02:54:54.8482009Z Dec 27 02:54:54  at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> 2021-12-27T02:54:54.8482636Z Dec 27 02:54:54  at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> 2021-12-27T02:54:54.8483267Z Dec 27 02:54:54  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> 2021-12-27T02:54:54.8483900Z Dec 27 02:54:54  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> 2021-12-27T02:54:54.8484574Z Dec 27 02:54:54  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> 2021-12-27T02:54:54.8485214Z Dec 27 02:54:54  at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> 2021-12-27T02:54:54.8485838Z Dec 27 02:54:54  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> 2021-12-27T02:54:54.8486441Z Dec 27 02:54:54  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> 2021-12-27T02:54:54.8487037Z Dec 27 02:54:54  at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> 2021-12-27T02:54:54.8487620Z Dec 27 02:54:54  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> 2021-12-27T02:54:54.8488391Z Dec 27 02:54:54  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.ja

[jira] [Updated] (FLINK-25456) FlinkKafkaProducerITCase.testScaleDownBeforeFirstCheckpoint

2022-02-14 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao updated FLINK-25456:

Affects Version/s: 1.15.0

> FlinkKafkaProducerITCase.testScaleDownBeforeFirstCheckpoint
> ---
>
> Key: FLINK-25456
> URL: https://issues.apache.org/jira/browse/FLINK-25456
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.15.0, 1.14.2
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> The test {{FlinkKafkaProducerITCase.testScaleDownBeforeFirstCheckpoint}} 
> fails with
> {code}
> 2021-12-27T02:54:54.8464375Z Dec 27 02:54:54 [ERROR] Tests run: 15, Failures: 
> 1, Errors: 0, Skipped: 0, Time elapsed: 285.279 s <<< FAILURE! - in 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase
> 2021-12-27T02:54:54.8465354Z Dec 27 02:54:54 [ERROR] 
> testScaleDownBeforeFirstCheckpoint  Time elapsed: 85.514 s  <<< FAILURE!
> 2021-12-27T02:54:54.8468827Z Dec 27 02:54:54 java.lang.AssertionError: 
> Detected producer leak. Thread name: kafka-producer-network-thread | 
> producer-MockTask-002a002c-18
> 2021-12-27T02:54:54.8469779Z Dec 27 02:54:54  at 
> org.junit.Assert.fail(Assert.java:89)
> 2021-12-27T02:54:54.8470485Z Dec 27 02:54:54  at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.checkProducerLeak(FlinkKafkaProducerITCase.java:847)
> 2021-12-27T02:54:54.8471842Z Dec 27 02:54:54  at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testScaleDownBeforeFirstCheckpoint(FlinkKafkaProducerITCase.java:381)
> 2021-12-27T02:54:54.8472724Z Dec 27 02:54:54  at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2021-12-27T02:54:54.8473509Z Dec 27 02:54:54  at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2021-12-27T02:54:54.8474704Z Dec 27 02:54:54  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2021-12-27T02:54:54.8475523Z Dec 27 02:54:54  at 
> java.base/java.lang.reflect.Method.invoke(Method.java:566)
> 2021-12-27T02:54:54.8476258Z Dec 27 02:54:54  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 2021-12-27T02:54:54.8476949Z Dec 27 02:54:54  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2021-12-27T02:54:54.8477632Z Dec 27 02:54:54  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 2021-12-27T02:54:54.8478451Z Dec 27 02:54:54  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2021-12-27T02:54:54.8479282Z Dec 27 02:54:54  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2021-12-27T02:54:54.8479976Z Dec 27 02:54:54  at 
> org.apache.flink.testutils.junit.RetryRule$RetryOnFailureStatement.evaluate(RetryRule.java:135)
> 2021-12-27T02:54:54.8480696Z Dec 27 02:54:54  at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> 2021-12-27T02:54:54.8481410Z Dec 27 02:54:54  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> 2021-12-27T02:54:54.8482009Z Dec 27 02:54:54  at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> 2021-12-27T02:54:54.8482636Z Dec 27 02:54:54  at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> 2021-12-27T02:54:54.8483267Z Dec 27 02:54:54  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> 2021-12-27T02:54:54.8483900Z Dec 27 02:54:54  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> 2021-12-27T02:54:54.8484574Z Dec 27 02:54:54  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> 2021-12-27T02:54:54.8485214Z Dec 27 02:54:54  at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> 2021-12-27T02:54:54.8485838Z Dec 27 02:54:54  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> 2021-12-27T02:54:54.8486441Z Dec 27 02:54:54  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> 2021-12-27T02:54:54.8487037Z Dec 27 02:54:54  at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> 2021-12-27T02:54:54.8487620Z Dec 27 02:54:54  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> 2021-12-27T02:54:54.8488391Z Dec 27 02:54:54  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2021-12-27T02:54:54.8489050Z Dec 27 02:54:54  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2021-12-27T02:54:54.8489685Z Dec 27 02:54:54  at 
> or

[jira] [Updated] (FLINK-26100) Set up Flink ML Document Website

2022-02-14 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated FLINK-26100:
-
Component/s: Documentation

> Set up Flink ML Document Website
> 
>
> Key: FLINK-26100
> URL: https://issues.apache.org/jira/browse/FLINK-26100
> Project: Flink
>  Issue Type: New Feature
>  Components: Documentation, Library / Machine Learning
>Affects Versions: ml-2.0.0
>Reporter: Yunfeng Zhou
>Assignee: Yunfeng Zhou
>Priority: Major
>  Labels: pull-request-available
>
> Set up Flink ML's document website based on flink document and statefun 
> document website.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-26100) Set up Flink ML Document Website

2022-02-14 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin reassigned FLINK-26100:


Assignee: Yunfeng Zhou

> Set up Flink ML Document Website
> 
>
> Key: FLINK-26100
> URL: https://issues.apache.org/jira/browse/FLINK-26100
> Project: Flink
>  Issue Type: New Feature
>  Components: Library / Machine Learning
>Affects Versions: ml-2.0.0
>Reporter: Yunfeng Zhou
>Assignee: Yunfeng Zhou
>Priority: Major
>  Labels: pull-request-available
>
> Set up Flink ML's document website based on flink document and statefun 
> document website.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26100) Set up Flink ML Document Website

2022-02-14 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated FLINK-26100:
-
Component/s: (was: Documentation)

> Set up Flink ML Document Website
> 
>
> Key: FLINK-26100
> URL: https://issues.apache.org/jira/browse/FLINK-26100
> Project: Flink
>  Issue Type: New Feature
>  Components: Library / Machine Learning
>Affects Versions: ml-2.0.0
>Reporter: Yunfeng Zhou
>Assignee: Yunfeng Zhou
>Priority: Major
>  Labels: pull-request-available
>
> Set up Flink ML's document website based on flink document and statefun 
> document website.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] rkhachatryan commented on a change in pull request #18741: [FLINK-26101][changelog] Avoid shared state registry to discard multi-registered identical changelog state

2022-02-14 Thread GitBox


rkhachatryan commented on a change in pull request #18741:
URL: https://github.com/apache/flink/pull/18741#discussion_r805618324



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
##
@@ -237,6 +237,23 @@ public FSDataInputStream openInputStream() throws 
IOException {
 public Optional asBytesIfInMemory() {
 throw new UnsupportedOperationException("Should not call 
here.");
 }
+
+@Override
+public boolean equals(Object o) {
+if (this == o) {
+return true;
+}
+if (o == null || getClass() != o.getClass()) {
+return false;
+}
+StreamStateHandleWrapper that = (StreamStateHandleWrapper) o;
+return Objects.equals(keyedStateHandle, that.keyedStateHandle);

Review comment:
   > The PlaceholderStreamStateHandle itself is not the materialized keyed 
state handle, but part of it.
   
   Yes, but because of wrapping, `SharedStateRegistry` will call 
`IncrementalRemoteKeyedStateHandle.equals`, which in turn will compare File 
state handle with a placeholder. Currently, `SharedStateRegistry` has a 
separate branch for placeholders.
   
   > Actually, I did not understand in what case will we have two or more 
materializations, in the case of restoring?
   
   Yes.
   
   > Or in other words, what test will fail if do not compare the state handle 
id?
   
   IIRC, `EventTimeWindowCheckpointingITCase` was failing with materialization 
interval of `100ms`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] dmvk commented on a change in pull request #18689: [FLINK-21439][runtime] Exception history adaptive scheduler

2022-02-14 Thread GitBox


dmvk commented on a change in pull request #18689:
URL: https://github.com/apache/flink/pull/18689#discussion_r805618265



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
##
@@ -306,22 +331,88 @@ void deliverOperatorEventToCoordinator(
 operatorId, request);
 }
 
+/** Transition to different state when failure occurs. Stays in the same 
state by default. */
+abstract void onFailure(Throwable cause);
+
+/**
+ * Transition to different state when the execution graph reaches a 
globally terminal state.
+ *
+ * @param globallyTerminalState globally terminal state which the 
execution graph reached
+ */
+abstract void onGloballyTerminalState(JobStatus globallyTerminalState);
+
+@Override
+public void handleGlobalFailure(Throwable cause) {
+failureCollection.add(new GlobalFailure(cause));
+onFailure(cause);
+}
+
 /**
  * Updates the execution graph with the given task execution state 
transition.
  *
  * @param taskExecutionStateTransition taskExecutionStateTransition to 
update the ExecutionGraph
  * with
  * @return {@code true} if the update was successful; otherwise {@code 
false}
  */
-abstract boolean updateTaskExecutionState(
-TaskExecutionStateTransition taskExecutionStateTransition);
+boolean updateTaskExecutionState(TaskExecutionStateTransition 
taskExecutionStateTransition) {
+if (taskExecutionStateTransition.getExecutionState() != 
ExecutionState.FAILED) {
+return 
getExecutionGraph().updateState(taskExecutionStateTransition);
+}
 
-/**
- * Callback which is called once the execution graph reaches a globally 
terminal state.
- *
- * @param globallyTerminalState globally terminal state which the 
execution graph reached
- */
-abstract void onGloballyTerminalState(JobStatus globallyTerminalState);
+// We need to collect the ExecutionVertexID before updating the state, 
because the Execution
+// is de-registered afterwards.
+// We need to use an optional here, because this method can be called 
even after the
+// Execution is de-registered.
+Optional idOpt =
+
executionGraph.findExecutionVertexId(taskExecutionStateTransition.getID());
+final boolean successfulUpdate =
+getExecutionGraph().updateState(taskExecutionStateTransition);
+if (!successfulUpdate) {
+return false;
+}
+
+Throwable cause = 
extractErrorOrUseDefault(taskExecutionStateTransition);
+
+checkState(idOpt.isPresent());
+ExecutionVertexID id = idOpt.get();
+
+if (getNonEmptyExecution(id).getFailureInfo().isPresent()) {
+failureCollection.add(new LocalFailure(cause, id));
+}
+onFailure(cause);
+return true;

Review comment:
   2nd thought, you're right that we should still return true even though 
we're not actually doing any work
   
   Then it can be basically simplified to this:
   
   ```java
   final Optional maybeExecutionVertexId =
   
executionGraph.findExecutionVertexId(taskExecutionStateTransition.getID());
   final boolean successfulUpdate =
   
getExecutionGraph().updateState(taskExecutionStateTransition);
   if (successfulUpdate) {
   // We're sure that the executionVertexId has been found, because 
we've been able to
   // update the execution graph.
   final ExecutionVertexID executionVertexId =
   
maybeExecutionVertexId.orElseThrow(NoSuchElementException::new);
   final ExecutionState desiredState = 
taskExecutionStateTransition.getExecutionState();
   final ExecutionState newState =
   executionGraph
   .findExecution(executionVertexId)
   .map(Execution::getState)
   .orElseThrow(NoSuchElementException::new);
   // We only want a notification for the actual transition into 
the FAILED state.
   if (desiredState == ExecutionState.FAILED && desiredState == 
newState) {
   final Throwable cause = 
extractErrorOrUseDefault(taskExecutionStateTransition);
   if 
(getNonEmptyExecution(executionVertexId).getFailureInfo().isPresent()) {
   failureCollection.add(new LocalFailure(cause, 
executionVertexId));
   }
   onFailure(cause);
   }
   }
   return successfulUpdate;
   ```
   
   Sorry for the confusion




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail

[GitHub] [flink] dmvk commented on a change in pull request #18689: [FLINK-21439][runtime] Exception history adaptive scheduler

2022-02-14 Thread GitBox


dmvk commented on a change in pull request #18689:
URL: https://github.com/apache/flink/pull/18689#discussion_r805618265



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
##
@@ -306,22 +331,88 @@ void deliverOperatorEventToCoordinator(
 operatorId, request);
 }
 
+/** Transition to different state when failure occurs. Stays in the same 
state by default. */
+abstract void onFailure(Throwable cause);
+
+/**
+ * Transition to different state when the execution graph reaches a 
globally terminal state.
+ *
+ * @param globallyTerminalState globally terminal state which the 
execution graph reached
+ */
+abstract void onGloballyTerminalState(JobStatus globallyTerminalState);
+
+@Override
+public void handleGlobalFailure(Throwable cause) {
+failureCollection.add(new GlobalFailure(cause));
+onFailure(cause);
+}
+
 /**
  * Updates the execution graph with the given task execution state 
transition.
  *
  * @param taskExecutionStateTransition taskExecutionStateTransition to 
update the ExecutionGraph
  * with
  * @return {@code true} if the update was successful; otherwise {@code 
false}
  */
-abstract boolean updateTaskExecutionState(
-TaskExecutionStateTransition taskExecutionStateTransition);
+boolean updateTaskExecutionState(TaskExecutionStateTransition 
taskExecutionStateTransition) {
+if (taskExecutionStateTransition.getExecutionState() != 
ExecutionState.FAILED) {
+return 
getExecutionGraph().updateState(taskExecutionStateTransition);
+}
 
-/**
- * Callback which is called once the execution graph reaches a globally 
terminal state.
- *
- * @param globallyTerminalState globally terminal state which the 
execution graph reached
- */
-abstract void onGloballyTerminalState(JobStatus globallyTerminalState);
+// We need to collect the ExecutionVertexID before updating the state, 
because the Execution
+// is de-registered afterwards.
+// We need to use an optional here, because this method can be called 
even after the
+// Execution is de-registered.
+Optional idOpt =
+
executionGraph.findExecutionVertexId(taskExecutionStateTransition.getID());
+final boolean successfulUpdate =
+getExecutionGraph().updateState(taskExecutionStateTransition);
+if (!successfulUpdate) {
+return false;
+}
+
+Throwable cause = 
extractErrorOrUseDefault(taskExecutionStateTransition);
+
+checkState(idOpt.isPresent());
+ExecutionVertexID id = idOpt.get();
+
+if (getNonEmptyExecution(id).getFailureInfo().isPresent()) {
+failureCollection.add(new LocalFailure(cause, id));
+}
+onFailure(cause);
+return true;

Review comment:
   2nd thought, you're right that we should still return true even though 
we're not actually doing any work
   
   Then it can be basically simplified to this:
   
   ```java
   final Optional maybeExecutionVertexId =
   
executionGraph.findExecutionVertexId(taskExecutionStateTransition.getID());
   final boolean successfulUpdate =
   
getExecutionGraph().updateState(taskExecutionStateTransition);
   if (successfulUpdate) {
   // We're sure that the executionVertexId has been found, because 
we've been able to
   // update the execution graph.
   final ExecutionVertexID executionVertexId =
   
maybeExecutionVertexId.orElseThrow(NoSuchElementException::new);
   final ExecutionState desiredState = 
taskExecutionStateTransition.getExecutionState();
   final ExecutionState newState =
   executionGraph
   .findExecution(executionVertexId)
   .map(Execution::getState)
   .orElseThrow(NoSuchElementException::new);
   // We only want a notification for the actual transition into 
the FAILED state.
   if (desiredState == ExecutionState.FAILED && desiredState == 
newState) {
   final Throwable cause = 
extractErrorOrUseDefault(taskExecutionStateTransition);
   if 
(getNonEmptyExecution(executionVertexId).getFailureInfo().isPresent()) {
   failureCollection.add(new LocalFailure(cause, 
executionVertexId));
   }
   onFailure(cause);
   }
   }
   return successfulUpdate;
   ```
   
   Sorry for the confusion




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubsc

[GitHub] [flink] flinkbot edited a comment on pull request #18689: [FLINK-21439][runtime] Exception history adaptive scheduler

2022-02-14 Thread GitBox


flinkbot edited a comment on pull request #18689:
URL: https://github.com/apache/flink/pull/18689#issuecomment-1033918208


   
   ## CI report:
   
   * 0666d52c8b1e8a45f53366bbcdcbfc5468c436a7 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=31343)
 
   * 5d1ce4952e98bd88315c899f612d2b1438af9f9a Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=31382)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18736: [hotfix][datastream] move the change and restore of env parallelism into the adjusTransformations method.

2022-02-14 Thread GitBox


flinkbot edited a comment on pull request #18736:
URL: https://github.com/apache/flink/pull/18736#issuecomment-1036747854


   
   ## CI report:
   
   * ba8ff7ae7257614a8987c6cf157c95eba5e9ca32 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=31302)
 
   * 4250e55855addd481d9280358dc6ac83e0db8fcd UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-26100) Set up Flink ML Document Website

2022-02-14 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated FLINK-26100:
-
Issue Type: Improvement  (was: New Feature)

> Set up Flink ML Document Website
> 
>
> Key: FLINK-26100
> URL: https://issues.apache.org/jira/browse/FLINK-26100
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Affects Versions: ml-2.0.0
>Reporter: Yunfeng Zhou
>Assignee: Yunfeng Zhou
>Priority: Major
>  Labels: pull-request-available
>
> Set up Flink ML's document website based on flink document and statefun 
> document website.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] wsry commented on pull request #18723: [FLINK-25640][docs] Enhance the document for blocking shuffle

2022-02-14 Thread GitBox


wsry commented on pull request #18723:
URL: https://github.com/apache/flink/pull/18723#issuecomment-1038819617


   @gaoyunhaii Thanks for the review and feedback. I have updated the PR 
accordingly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-table-store] tsreaper commented on a change in pull request #16: [FLINK-25876] Implement overwrite in FlinkStoreCommitImpl

2022-02-14 Thread GitBox


tsreaper commented on a change in pull request #16:
URL: https://github.com/apache/flink-table-store/pull/16#discussion_r805623690



##
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
##
@@ -168,21 +157,67 @@ public void overwrite(
 Map partition,
 ManifestCommittable committable,
 Map properties) {
-throw new UnsupportedOperationException();
+if (LOG.isDebugEnabled()) {
+LOG.debug(
+"Ready to overwrite partition "
++ partition.toString()
++ "\n"
++ committable.toString());
+}
+
+BinaryRowData partitionRowData =
+TypeUtils.partitionMapToBinaryRowData(partition, 
partitionType);
+
+List appendChanges = 
collectChanges(committable.newFiles(), ValueKind.ADD);
+tryOverwrite(
+partitionRowData, appendChanges, committable.uuid(), 
Snapshot.CommitKind.APPEND);
+
+List compactChanges = new ArrayList<>();
+compactChanges.addAll(collectChanges(committable.compactBefore(), 
ValueKind.DELETE));
+compactChanges.addAll(collectChanges(committable.compactAfter(), 
ValueKind.ADD));
+tryCommit(compactChanges, committable.uuid(), 
Snapshot.CommitKind.COMPACT);
 }
 
-private String digestManifestCommittable(ManifestCommittable committable) {
-try {
-return new String(
-Base64.getEncoder()
-.encode(
-MessageDigest.getInstance("MD5")
-
.digest(committableSerializer.serialize(committable;
-} catch (NoSuchAlgorithmException e) {
-throw new RuntimeException("MD5 algorithm not found. This is 
impossible.", e);
-} catch (IOException e) {
-throw new RuntimeException(
-"Failed to serialize ManifestCommittable. This is 
unexpected.", e);
+private void tryCommit(
+List changes, String hash, Snapshot.CommitKind 
commitKind) {
+while (true) {
+Long latestSnapshotId = pathFactory.latestSnapshotId();
+if (tryCommitOnce(changes, hash, commitKind, latestSnapshotId)) {
+break;
+}
+}
+}
+
+private void tryOverwrite(
+BinaryRowData partition,
+List changes,
+String hash,
+Snapshot.CommitKind commitKind) {
+while (true) {
+Long latestSnapshotId = pathFactory.latestSnapshotId();
+
+List changesWithOverwrite = new ArrayList<>();
+if (latestSnapshotId != null) {
+List currentEntries =
+scan.withSnapshot(latestSnapshotId)
+
.withPartitionFilter(Collections.singletonList(partition))
+.plan()
+.files();
+for (ManifestEntry entry : currentEntries) {
+changesWithOverwrite.add(
+new ManifestEntry(
+ValueKind.DELETE,
+entry.partition(),
+entry.bucket(),
+entry.totalBuckets(),
+entry.file()));
+}
+}
+changesWithOverwrite.addAll(changes);
+
+if (tryCommitOnce(changesWithOverwrite, hash, commitKind, 
latestSnapshotId)) {

Review comment:
   Another job might be overwriting the same partition at the same time, so 
we still need to check this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-26117) JobManagerMetricGroup needs to implement GloballyCleanableResource as well

2022-02-14 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-26117:
-

 Summary: JobManagerMetricGroup needs to implement 
GloballyCleanableResource as well
 Key: FLINK-26117
 URL: https://issues.apache.org/jira/browse/FLINK-26117
 Project: Flink
  Issue Type: Sub-task
Reporter: Matthias Pohl


There's a mistake being done during the introduction of the 
{{*CleanableResources}} interfaces. The {{JobManagerMetricGroup}} should also 
implement the global cleanup since it's called in both cleanup types not only 
the local one.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26117) JobManagerMetricGroup needs to implement GloballyCleanableResource as well

2022-02-14 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-26117:
--
Component/s: Runtime / Coordination

> JobManagerMetricGroup needs to implement GloballyCleanableResource as well
> --
>
> Key: FLINK-26117
> URL: https://issues.apache.org/jira/browse/FLINK-26117
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Matthias Pohl
>Priority: Major
>
> There's a mistake being done during the introduction of the 
> {{*CleanableResources}} interfaces. The {{JobManagerMetricGroup}} should also 
> implement the global cleanup since it's called in both cleanup types not only 
> the local one.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-26117) JobManagerMetricGroup needs to implement GloballyCleanableResource as well

2022-02-14 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl reassigned FLINK-26117:
-

Assignee: Matthias Pohl

> JobManagerMetricGroup needs to implement GloballyCleanableResource as well
> --
>
> Key: FLINK-26117
> URL: https://issues.apache.org/jira/browse/FLINK-26117
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.15.0
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Major
>
> There's a mistake being done during the introduction of the 
> {{*CleanableResources}} interfaces. The {{JobManagerMetricGroup}} should also 
> implement the global cleanup since it's called in both cleanup types not only 
> the local one.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26117) JobManagerMetricGroup needs to implement GloballyCleanableResource as well

2022-02-14 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-26117:
--
Affects Version/s: 1.15.0

> JobManagerMetricGroup needs to implement GloballyCleanableResource as well
> --
>
> Key: FLINK-26117
> URL: https://issues.apache.org/jira/browse/FLINK-26117
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.15.0
>Reporter: Matthias Pohl
>Priority: Major
>
> There's a mistake being done during the introduction of the 
> {{*CleanableResources}} interfaces. The {{JobManagerMetricGroup}} should also 
> implement the global cleanup since it's called in both cleanup types not only 
> the local one.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


  1   2   3   4   5   6   7   8   9   >