[jira] [Closed] (FLINK-7343) Kafka010ProducerITCase instability

2017-08-08 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-7343.
-
   Resolution: Fixed
Fix Version/s: 1.4.0

Hopefully with two recent changes this issue will not show up again.

> Kafka010ProducerITCase instability
> --
>
> Key: FLINK-7343
> URL: https://issues.apache.org/jira/browse/FLINK-7343
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>  Labels: test-stability
> Fix For: 1.4.0
>
>
> As reported by [~till.rohrmann] in 
> https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test 
> instability with 
> `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink`
> https://travis-ci.org/tillrohrmann/flink/jobs/258538641
> It is probably related to log.flush intervals in Kafka, which delay flushing 
> the data to files and potentially causing data loses on killing Kafka brokers 
> in the tests.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7476) Try to recover from failure in TwoPhaseCommitSinkFunction.beginTransaction

2017-08-18 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-7476:
-

 Summary: Try to recover from failure in 
TwoPhaseCommitSinkFunction.beginTransaction
 Key: FLINK-7476
 URL: https://issues.apache.org/jira/browse/FLINK-7476
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski


Currently when using TwoPhaseCommitSinkFunction, if there is some intermittent 
failure in "beginTransaction", not only the snapshot that triggered this call 
fail, but also any subsequent write requests will fail also, rendering such 
sink unusable until application restart.

This issue is in code that hasn't been released yet.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7497) Allow users to add custom user context stored in state in TwoPhaseCommitSinkFunction

2017-08-24 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-7497:
-

 Summary: Allow users to add custom user context stored in state in 
TwoPhaseCommitSinkFunction
 Key: FLINK-7497
 URL: https://issues.apache.org/jira/browse/FLINK-7497
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.0
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski
 Fix For: 1.4.0


Currently when using TwoPhaseCommitSinkFunction, if there is some intermittent 
failure in "beginTransaction", not only the snapshot that triggered this call 
fail, but also any subsequent write requests will fail also, rendering such 
sink unusable until application restart.

This issue is in code that hasn't been released yet.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7497) Allow users to add custom user context stored in state in TwoPhaseCommitSinkFunction

2017-08-24 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-7497:
--
Description: 
Currently when using TwoPhaseCommitSinkFunction there is no way to store on 
state additional user information that should be coupled with opened 
transactions (like shared state between transaction).

It is required by FlinkKafkaProducer011, because there we need some place to 
store a pool of used transactional.ids.

  was:
Currently when using TwoPhaseCommitSinkFunction, if there is some intermittent 
failure in "beginTransaction", not only the snapshot that triggered this call 
fail, but also any subsequent write requests will fail also, rendering such 
sink unusable until application restart.

This issue is in code that hasn't been released yet.

 Issue Type: New Feature  (was: Bug)

> Allow users to add custom user context stored in state in 
> TwoPhaseCommitSinkFunction
> 
>
> Key: FLINK-7497
> URL: https://issues.apache.org/jira/browse/FLINK-7497
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> Currently when using TwoPhaseCommitSinkFunction there is no way to store on 
> state additional user information that should be coupled with opened 
> transactions (like shared state between transaction).
> It is required by FlinkKafkaProducer011, because there we need some place to 
> store a pool of used transactional.ids.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7498) Bind together state fields of TwoPhaseCommitSinkFunction

2017-08-24 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-7498:
-

 Summary: Bind together state fields of TwoPhaseCommitSinkFunction
 Key: FLINK-7498
 URL: https://issues.apache.org/jira/browse/FLINK-7498
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.0
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski
 Fix For: 1.4.0


Currently when using TwoPhaseCommitSinkFunction, if there is some intermittent 
failure in "beginTransaction", not only the snapshot that triggered this call 
fail, but also any subsequent write requests will fail also, rendering such 
sink unusable until application restart.

This issue is in code that hasn't been released yet.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7498) Bind together state fields of TwoPhaseCommitSinkFunction

2017-08-24 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-7498:
--
Description: Make sure that state fields are coupled together between 
checkpoints. This way all opened transactions by one operator instance will be 
restored to same operator after restoring from checkpoint.  (was: Currently 
when using TwoPhaseCommitSinkFunction, if there is some intermittent failure in 
"beginTransaction", not only the snapshot that triggered this call fail, but 
also any subsequent write requests will fail also, rendering such sink unusable 
until application restart.

This issue is in code that hasn't been released yet.)

> Bind together state fields of TwoPhaseCommitSinkFunction
> 
>
> Key: FLINK-7498
> URL: https://issues.apache.org/jira/browse/FLINK-7498
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> Make sure that state fields are coupled together between checkpoints. This 
> way all opened transactions by one operator instance will be restored to same 
> operator after restoring from checkpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7498) Bind together state fields of TwoPhaseCommitSinkFunction

2017-08-24 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-7498:
--
Issue Type: Improvement  (was: Bug)

> Bind together state fields of TwoPhaseCommitSinkFunction
> 
>
> Key: FLINK-7498
> URL: https://issues.apache.org/jira/browse/FLINK-7498
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> Make sure that state fields are coupled together between checkpoints. This 
> way all opened transactions by one operator instance will be restored to same 
> operator after restoring from checkpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7476) Try to recover from failure in TwoPhaseCommitSinkFunction.beginTransaction

2017-08-24 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-7476.
-
  Resolution: Won't Fix
Release Note: Currently there is no way in Flink for operator to fail 
checkpoint while continue executing Flink application.

> Try to recover from failure in TwoPhaseCommitSinkFunction.beginTransaction
> --
>
> Key: FLINK-7476
> URL: https://issues.apache.org/jira/browse/FLINK-7476
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> Currently when using TwoPhaseCommitSinkFunction, if there is some 
> intermittent failure in "beginTransaction", not only the snapshot that 
> triggered this call fail, but also any subsequent write requests will fail 
> also, rendering such sink unusable until application restart.
> This issue is in code that hasn't been released yet.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7541) Redistribute operator state using OperatorID

2017-08-28 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-7541:
-

 Summary: Redistribute operator state using OperatorID
 Key: FLINK-7541
 URL: https://issues.apache.org/jira/browse/FLINK-7541
 Project: Flink
  Issue Type: Improvement
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski


Currently StateAssignmentOperation relays heavily on the order of new and old 
operators in the task. It should be changed and it should relay more on 
OperatorID.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7561) Add support for pre-aggregation in DataStream API

2017-08-30 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-7561:
-

 Summary: Add support for pre-aggregation in DataStream API
 Key: FLINK-7561
 URL: https://issues.apache.org/jira/browse/FLINK-7561
 Project: Flink
  Issue Type: New Feature
  Components: DataStream API
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-31 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-6988:
--
Fix Version/s: 1.4.0

> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-10861) Kafka e2e test fails during tearing down

2018-11-13 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-10861.
--
   Resolution: Fixed
Fix Version/s: 1.8.0

 merged commit 9f8df97 into apache:master

> Kafka e2e test fails during tearing down
> 
>
> Key: FLINK-10861
> URL: https://issues.apache.org/jira/browse/FLINK-10861
> Project: Flink
>  Issue Type: Bug
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {noformat}
> flink-end-to-end-tests/test-scripts/kafka-common.sh: line 92: kill: (610) - 
> No such process
> {noformat}
> https://api.travis-ci.org/v3/job/454070232/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10874) Kafka 2.0 connector testMigrateFromAtLeastOnceToExactlyOnce failure

2018-11-14 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-10874:
--

 Summary: Kafka 2.0 connector 
testMigrateFromAtLeastOnceToExactlyOnce failure
 Key: FLINK-10874
 URL: https://issues.apache.org/jira/browse/FLINK-10874
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.8.0
Reporter: Piotr Nowojski


https://api.travis-ci.org/v3/job/454449444/log.txt


{noformat}
Test 
testMigrateFromAtLeastOnceToExactlyOnce(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase)
 is running.

16:35:07,894 WARN  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer  - Property 
[transaction.timeout.ms] not specified. Setting it to 360 ms
16:35:07,903 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer  - Starting 
FlinkKafkaInternalProducer (1/1) to produce into default topic 
testMigrateFromAtLeastOnceToExactlyOnce
16:35:08,785 ERROR 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase  - 

Test 
testMigrateFromAtLeastOnceToExactlyOnce(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase)
 failed with:
java.lang.Exception: Could not complete snapshot 0 for operator MockTask (1/1).
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:419)
at 
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshotWithLocalState(AbstractStreamOperatorTestHarness.java:505)
at 
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:497)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testRecoverWithChangeSemantics(FlinkKafkaProducerITCase.java:591)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testMigrateFromAtLeastOnceToExactlyOnce(FlinkKafkaProducerITCase.java:569)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: 
Failed to send data to Kafka: This server does not 

[jira] [Closed] (FLINK-10455) Potential Kafka producer leak in case of failures

2018-11-14 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-10455.
--
   Resolution: Fixed
Fix Version/s: 1.7.0

merged commit 9f1f25d into apache:master
merged commit b7ba497d0d into apache:release-1.7

> Potential Kafka producer leak in case of failures
> -
>
> Key: FLINK-10455
> URL: https://issues.apache.org/jira/browse/FLINK-10455
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.5.2
>Reporter: Nico Kruber
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> If the Kafka brokers' timeout is too low for our checkpoint interval [1], we 
> may get an {{ProducerFencedException}}. Documentation around 
> {{ProducerFencedException}} explicitly states that we should close the 
> producer after encountering it.
> By looking at the code, it doesn't seem like this is actually done in 
> {{FlinkKafkaProducer011}}. Also, in case one transaction's commit in 
> {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} fails with an 
> exception, we don't clean up (nor try to commit) any other transaction.
> -> from what I see, {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} 
> simply iterates over the {{pendingCommitTransactions}} which is not touched 
> during {{close()}}
> Now if we restart the failing job on the same Flink cluster, any resources 
> from the previous attempt will still linger around.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10874) Kafka 2.0 connector testMigrateFromAtLeastOnceToExactlyOnce failure

2018-11-14 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16686361#comment-16686361
 ] 

Piotr Nowojski commented on FLINK-10874:


Google search for this error suggests that usually this error happens around 
broker restarting and re-electing the leaders. Maybe this is some kind of an 
after shock of restarting a broker from another test.

Maybe this will be fixed by: https://issues.apache.org/jira/browse/FLINK-10838

> Kafka 2.0 connector testMigrateFromAtLeastOnceToExactlyOnce failure
> ---
>
> Key: FLINK-10874
> URL: https://issues.apache.org/jira/browse/FLINK-10874
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.8.0
>Reporter: Piotr Nowojski
>Priority: Critical
>
> https://api.travis-ci.org/v3/job/454449444/log.txt
> {noformat}
> Test 
> testMigrateFromAtLeastOnceToExactlyOnce(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase)
>  is running.
> 
> 16:35:07,894 WARN  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer  - Property 
> [transaction.timeout.ms] not specified. Setting it to 360 ms
> 16:35:07,903 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer  - Starting 
> FlinkKafkaInternalProducer (1/1) to produce into default topic 
> testMigrateFromAtLeastOnceToExactlyOnce
> 16:35:08,785 ERROR 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase  - 
> 
> Test 
> testMigrateFromAtLeastOnceToExactlyOnce(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase)
>  failed with:
> java.lang.Exception: Could not complete snapshot 0 for operator MockTask 
> (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:419)
>   at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshotWithLocalState(AbstractStreamOperatorTestHarness.java:505)
>   at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:497)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testRecoverWithChangeSemantics(FlinkKafkaProducerITCase.java:591)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testMigrateFromAtLeastOnceToExactlyOnce(FlinkKafkaProducerITCase.java:569)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JU

[jira] [Reopened] (FLINK-10861) Kafka e2e test fails during tearing down

2018-11-15 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reopened FLINK-10861:


> Kafka e2e test fails during tearing down
> 
>
> Key: FLINK-10861
> URL: https://issues.apache.org/jira/browse/FLINK-10861
> Project: Flink
>  Issue Type: Bug
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {noformat}
> flink-end-to-end-tests/test-scripts/kafka-common.sh: line 92: kill: (610) - 
> No such process
> {noformat}
> https://api.travis-ci.org/v3/job/454070232/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10624) Extend SQL client end-to-end to test new KafkaTableSink

2018-11-15 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-10624.
--
Resolution: Fixed

Merged to master as 4986961734cdd5b465656d227022a3264621f1f7
Merged to release-1.7 as 62f9fbbdaa00650dec21da7f4b32c3e383ac0a8d

> Extend SQL client end-to-end to test new KafkaTableSink
> ---
>
> Key: FLINK-10624
> URL: https://issues.apache.org/jira/browse/FLINK-10624
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Table API & SQL, Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-9697, we added support for Kafka 2.0. We should also extend the 
> existing streaming client end-to-end test to also test the new 
> {{KafkaTableSink}} against Kafka 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10861) Kafka e2e test fails during tearing down

2018-11-15 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-10861.
--
   Resolution: Fixed
Fix Version/s: (was: 1.8.0)
   1.7.0

Backported to release-1.7 2f5435fdef73e91aa115a95e19d221eb045bd27b

> Kafka e2e test fails during tearing down
> 
>
> Key: FLINK-10861
> URL: https://issues.apache.org/jira/browse/FLINK-10861
> Project: Flink
>  Issue Type: Bug
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> {noformat}
> flink-end-to-end-tests/test-scripts/kafka-common.sh: line 92: kill: (610) - 
> No such process
> {noformat}
> https://api.travis-ci.org/v3/job/454070232/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10900) Mark Kafka 2.0 connector as beta feature

2018-11-16 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-10900.
--
Resolution: Fixed

merged commit 1680132 into apache:master
commits 146f4340ed..d02af7eda3 into apache:release-1.7

> Mark Kafka 2.0 connector as beta feature
> 
>
> Key: FLINK-10900
> URL: https://issues.apache.org/jira/browse/FLINK-10900
> Project: Flink
>  Issue Type: Task
>  Components: Kafka Connector
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Given the test problems with the Kafka 2.0 connector we should mark this 
> connector as a beta feature until we have fully understood why so many tests 
> deadlock.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-10455) Potential Kafka producer leak in case of failures

2018-11-16 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reopened FLINK-10455:


> Potential Kafka producer leak in case of failures
> -
>
> Key: FLINK-10455
> URL: https://issues.apache.org/jira/browse/FLINK-10455
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.5.2
>Reporter: Nico Kruber
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> If the Kafka brokers' timeout is too low for our checkpoint interval [1], we 
> may get an {{ProducerFencedException}}. Documentation around 
> {{ProducerFencedException}} explicitly states that we should close the 
> producer after encountering it.
> By looking at the code, it doesn't seem like this is actually done in 
> {{FlinkKafkaProducer011}}. Also, in case one transaction's commit in 
> {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} fails with an 
> exception, we don't clean up (nor try to commit) any other transaction.
> -> from what I see, {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} 
> simply iterates over the {{pendingCommitTransactions}} which is not touched 
> during {{close()}}
> Now if we restart the failing job on the same Flink cluster, any resources 
> from the previous attempt will still linger around.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10455) Potential Kafka producer leak in case of failures

2018-11-16 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-10455.
--
   Resolution: Fixed
Fix Version/s: 1.6.3
   1.5.6

merged commit e493d83 into apache:release-1.5
merged commit 489be82 into apache:release-1.6

> Potential Kafka producer leak in case of failures
> -
>
> Key: FLINK-10455
> URL: https://issues.apache.org/jira/browse/FLINK-10455
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.5.2
>Reporter: Nico Kruber
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> If the Kafka brokers' timeout is too low for our checkpoint interval [1], we 
> may get an {{ProducerFencedException}}. Documentation around 
> {{ProducerFencedException}} explicitly states that we should close the 
> producer after encountering it.
> By looking at the code, it doesn't seem like this is actually done in 
> {{FlinkKafkaProducer011}}. Also, in case one transaction's commit in 
> {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} fails with an 
> exception, we don't clean up (nor try to commit) any other transaction.
> -> from what I see, {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} 
> simply iterates over the {{pendingCommitTransactions}} which is not touched 
> during {{close()}}
> Now if we restart the failing job on the same Flink cluster, any resources 
> from the previous attempt will still linger around.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10891) Upgrade Kafka client version to 2.0.1

2018-11-16 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-10891.
--
   Resolution: Fixed
Fix Version/s: 1.7.0

merged commit 0bf8ad8 into apache:master
merged commit 7f305206a8 into apache:release-1.7

> Upgrade Kafka client version to 2.0.1
> -
>
> Key: FLINK-10891
> URL: https://issues.apache.org/jira/browse/FLINK-10891
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Since the modern kafka connector only keeps track of the latest version of 
> the kafka client. With the release of Kafka 2.0.1, we should upgrade the 
> version of the kafka client maven dependency.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10662) Refactor the ChannelSelector interface for single selected channel

2018-11-16 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16689674#comment-16689674
 ] 

Piotr Nowojski commented on FLINK-10662:


I think both current solution and iterator are more complicated to 
handle/implement. Iterator is probably better then {{int[]}}. As far as I know 
and talked with [~StephanEwen] there are no plans to select only some of the 
channels, so that's why I was in favour of dropping it. Performance benefits 
were not huge, but visible ~10% in synthetic network benchmarks if I remember 
correctly. Always something.

Normally this {{int[] selectChannels()}} simplification would be nice to have, 
but not must have for me. However since broadcasting will need to be handled in 
a special way to avoid data copying, it would leave out {{int[] 
selectChannels()}} interface to ONLY handle single integers. 

{{Either}} could be less efficient and 
{{PredefinedPattern.BROADCASTING}} would also not be used/called after 
implementing broadcast optimisations.

> Refactor the ChannelSelector interface for single selected channel
> --
>
> Key: FLINK-10662
> URL: https://issues.apache.org/jira/browse/FLINK-10662
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.4, 1.6.1
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> In the discussion of broadcast improvement, [~pnowojski] pointed out the 
> issue of improving the current channel selector.
>  
> In {{ChannelSelector#selectChannels}}, it would return an array for selected 
> channels. But considering specific implementations, only 
> {{BroadcastPartitioner}} would select all the channels, and other 
> implementations will select one channel. So we can simple this interface to 
> return single channel index for benefiting performance, and specialize the 
> {{BroadcastPartitioner}} in a more efficient way.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-4387) Instability in KvStateClientTest.testClientServerIntegration()

2018-11-19 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-4387?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-4387:
--
Fix Version/s: (was: 1.7.1)
   (was: 1.6.3)

> Instability in KvStateClientTest.testClientServerIntegration()
> --
>
> Key: FLINK-4387
> URL: https://issues.apache.org/jira/browse/FLINK-4387
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.5.0, 1.6.0
>Reporter: Robert Metzger
>Assignee: Nico Kruber
>Priority: Major
>  Labels: test-stability
> Fix For: 1.2.0, 1.8.0
>
>
> According to this log: 
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/151491745/log.txt
> the {{KvStateClientTest}} didn't complete.
> {code}
> "main" #1 prio=5 os_prio=0 tid=0x7fb2b400a000 nid=0x29dc in Object.wait() 
> [0x7fb2bcb3b000]
>java.lang.Thread.State: WAITING (on object monitor)
>   at java.lang.Object.wait(Native Method)
>   - waiting on <0xf7c049a0> (a 
> io.netty.util.concurrent.DefaultPromise)
>   at java.lang.Object.wait(Object.java:502)
>   at 
> io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:254)
>   - locked <0xf7c049a0> (a 
> io.netty.util.concurrent.DefaultPromise)
>   at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:32)
>   at 
> org.apache.flink.runtime.query.netty.KvStateServer.shutDown(KvStateServer.java:185)
>   at 
> org.apache.flink.runtime.query.netty.KvStateClientTest.testClientServerIntegration(KvStateClientTest.java:680)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> {code}
> and
> {code}
> Exception in thread "globalEventExecutor-1-3" java.lang.AssertionError
>   at 
> io.netty.util.concurrent.AbstractScheduledEventExecutor.pollScheduledTask(AbstractScheduledEventExecutor.java:83)
>   at 
> io.netty.util.concurrent.GlobalEventExecutor.fetchFromScheduledTaskQueue(GlobalEventExecutor.java:110)
>   at 
> io.netty.util.concurrent.GlobalEventExecutor.takeTask(GlobalEventExecutor.java:95)
>   at 
> io.netty.util.concurrent.GlobalEventExecutor$TaskRunner.run(GlobalEventExecutor.java:226)
>   at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
>   at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10662) Refactor the ChannelSelector interface for single selected channel

2018-11-22 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695679#comment-16695679
 ] 

Piotr Nowojski commented on FLINK-10662:


Sounds good :)

> Refactor the ChannelSelector interface for single selected channel
> --
>
> Key: FLINK-10662
> URL: https://issues.apache.org/jira/browse/FLINK-10662
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.4, 1.6.1
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> In the discussion of broadcast improvement, [~pnowojski] pointed out the 
> issue of improving the current channel selector.
>  
> In {{ChannelSelector#selectChannels}}, it would return an array for selected 
> channels. But considering specific implementations, only 
> {{BroadcastPartitioner}} would select all the channels, and other 
> implementations will select one channel. So we can simple this interface to 
> return single channel index for benefiting performance, and specialize the 
> {{BroadcastPartitioner}} in a more efficient way.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10942) Deduplicate common codes in OutputEmitterTest

2018-11-26 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-10942.
--
   Resolution: Fixed
Fix Version/s: (was: 1.7.1)
   1.8.0

 merged commit 17e3576 into apache:master

> Deduplicate common codes in OutputEmitterTest
> -
>
> Key: FLINK-10942
> URL: https://issues.apache.org/jira/browse/FLINK-10942
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network, Tests
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> There are many duplicated codes in {{OutputEmitterTest}} to make it difficult 
> to maintain. So it is necessary to abstract the common codes to make it 
> simple which brings benefits for the following refactor work in 
> {{ChannelSelector}} interface.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10367) Avoid recursion stack overflow during releasing SingleInputGate

2018-11-27 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-10367.
--
   Resolution: Fixed
Fix Version/s: 1.8.0
   1.6.3

merged commit b379316 into apache:master
merged commit 73858ea6e7 into apache:release-1.6

not yet merged to release-1.7 because of release being in progress


> Avoid recursion stack overflow during releasing SingleInputGate
> ---
>
> Key: FLINK-10367
> URL: https://issues.apache.org/jira/browse/FLINK-10367
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.6.3, 1.8.0
>
>
> For task failure or canceling, the {{SingleInputGate#releaseAllResources}} 
> will be invoked before task exits.
> In the process of {{SingleInputGate#releaseAllResources}}, we first loop to 
> release all the input channels, then destroy the {{BufferPool}}.  For 
> {{RemoteInputChannel#releaseAllResources}}, it will return floating buffers 
> to the {{BufferPool}} {{which assigns this recycled buffer to the other 
> listeners(RemoteInputChannel}}). 
> It may exist recursive call in this process. If the listener is already 
> released before, it will directly recycle this buffer to the {{BufferPool}} 
> which takes another listener to notify available buffer. The above process 
> may be invoked repeatedly in recursive way.
> If there are many input channels as listeners in the {{BufferPool}}, it will 
> cause {{StackOverflow}} error because of recursion. And in our testing job, 
> the scale of 10,000 input channels ever caused this error.
> I think of two ways for solving this potential problem:
>  # When the input channel is released, it should notify the {{BufferPool}} of 
> unregistering this listener, otherwise it is inconsistent between them.
>  # {{SingleInputGate}} should destroy the {{BufferPool}} first, then loop to 
> release all the internal input channels. To do so, all the listeners in 
> {{BufferPool}} will be removed during destroying, and the input channel will 
> not have further interactions during 
> {{RemoteInputChannel#releaseAllResources}}.
> I prefer the second way to solve this problem, because we do not want to 
> expand another interface method for removing buffer listener, further 
> currently the internal data structure in {{BufferPool}} can not support 
> remove a listener directly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10820) Simplify the RebalancePartitioner implementation

2018-11-28 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-10820.
--
   Resolution: Fixed
Fix Version/s: 1.8.0

Merged as 2d511b0624 into the master.

> Simplify the RebalancePartitioner implementation
> 
>
> Key: FLINK-10820
> URL: https://issues.apache.org/jira/browse/FLINK-10820
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.8.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> _The current {{RebalancePartitioner}} implementation seems a little hacky for 
> selecting a random number as the first channel index, and the following 
> selections based on this random index in round-robin fashion._
> _Especially for the corner case of {{numChannels = Integer.MAX_VALUE}}, it 
> would trigger next random index once reaching the last channel index. 
> Actually the random index should be selected only once at the first time._



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-10367) Avoid recursion stack overflow during releasing SingleInputGate

2018-11-29 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reopened FLINK-10367:


> Avoid recursion stack overflow during releasing SingleInputGate
> ---
>
> Key: FLINK-10367
> URL: https://issues.apache.org/jira/browse/FLINK-10367
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.6.3, 1.8.0, 1.7.1
>
>
> For task failure or canceling, the {{SingleInputGate#releaseAllResources}} 
> will be invoked before task exits.
> In the process of {{SingleInputGate#releaseAllResources}}, we first loop to 
> release all the input channels, then destroy the {{BufferPool}}.  For 
> {{RemoteInputChannel#releaseAllResources}}, it will return floating buffers 
> to the {{BufferPool}} {{which assigns this recycled buffer to the other 
> listeners(RemoteInputChannel}}). 
> It may exist recursive call in this process. If the listener is already 
> released before, it will directly recycle this buffer to the {{BufferPool}} 
> which takes another listener to notify available buffer. The above process 
> may be invoked repeatedly in recursive way.
> If there are many input channels as listeners in the {{BufferPool}}, it will 
> cause {{StackOverflow}} error because of recursion. And in our testing job, 
> the scale of 10,000 input channels ever caused this error.
> I think of two ways for solving this potential problem:
>  # When the input channel is released, it should notify the {{BufferPool}} of 
> unregistering this listener, otherwise it is inconsistent between them.
>  # {{SingleInputGate}} should destroy the {{BufferPool}} first, then loop to 
> release all the internal input channels. To do so, all the listeners in 
> {{BufferPool}} will be removed during destroying, and the input channel will 
> not have further interactions during 
> {{RemoteInputChannel#releaseAllResources}}.
> I prefer the second way to solve this problem, because we do not want to 
> expand another interface method for removing buffer listener, further 
> currently the internal data structure in {{BufferPool}} can not support 
> remove a listener directly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10367) Avoid recursion stack overflow during releasing SingleInputGate

2018-11-29 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-10367.
--
   Resolution: Fixed
Fix Version/s: 1.7.1

merged as 64b4ef513a to release-1.7

> Avoid recursion stack overflow during releasing SingleInputGate
> ---
>
> Key: FLINK-10367
> URL: https://issues.apache.org/jira/browse/FLINK-10367
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.6.3, 1.8.0, 1.7.1
>
>
> For task failure or canceling, the {{SingleInputGate#releaseAllResources}} 
> will be invoked before task exits.
> In the process of {{SingleInputGate#releaseAllResources}}, we first loop to 
> release all the input channels, then destroy the {{BufferPool}}.  For 
> {{RemoteInputChannel#releaseAllResources}}, it will return floating buffers 
> to the {{BufferPool}} {{which assigns this recycled buffer to the other 
> listeners(RemoteInputChannel}}). 
> It may exist recursive call in this process. If the listener is already 
> released before, it will directly recycle this buffer to the {{BufferPool}} 
> which takes another listener to notify available buffer. The above process 
> may be invoked repeatedly in recursive way.
> If there are many input channels as listeners in the {{BufferPool}}, it will 
> cause {{StackOverflow}} error because of recursion. And in our testing job, 
> the scale of 10,000 input channels ever caused this error.
> I think of two ways for solving this potential problem:
>  # When the input channel is released, it should notify the {{BufferPool}} of 
> unregistering this listener, otherwise it is inconsistent between them.
>  # {{SingleInputGate}} should destroy the {{BufferPool}} first, then loop to 
> release all the internal input channels. To do so, all the listeners in 
> {{BufferPool}} will be removed during destroying, and the input channel will 
> not have further interactions during 
> {{RemoteInputChannel#releaseAllResources}}.
> I prefer the second way to solve this problem, because we do not want to 
> expand another interface method for removing buffer listener, further 
> currently the internal data structure in {{BufferPool}} can not support 
> remove a listener directly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10629) FlinkKafkaProducerITCase.testFlinkKafkaProducerFailTransactionCoordinatorBeforeNotify failed on Travis

2018-11-30 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-10629.
--
Resolution: Cannot Reproduce

I haven't seen Kafka 2.0 test failure since upgrading Kafka to {{2.0.1}}. 
Closing this issue for now. Please re-open if you spot this error again.

> FlinkKafkaProducerITCase.testFlinkKafkaProducerFailTransactionCoordinatorBeforeNotify
>  failed on Travis
> --
>
> Key: FLINK-10629
> URL: https://issues.apache.org/jira/browse/FLINK-10629
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Piotr Nowojski
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.8.0
>
>
> The 
> {{FlinkKafkaProducerITCase.testFlinkKafkaProducer10FailTransactionCoordinatorBeforeNotify}}
>  failed on Travis.
> https://api.travis-ci.org/v3/job/443777257/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10629) FlinkKafkaProducerITCase.testFlinkKafkaProducerFailTransactionCoordinatorBeforeNotify failed on Travis

2018-11-30 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-10629.
--
   Resolution: Cannot Reproduce
Fix Version/s: (was: 1.8.0)
   1.7.0

I haven't seen Kafka 2.0 test failure since upgrading Kafka to {{2.0.1}}. 
Closing this issue for now. Please re-open if you spot this error again.

> FlinkKafkaProducerITCase.testFlinkKafkaProducerFailTransactionCoordinatorBeforeNotify
>  failed on Travis
> --
>
> Key: FLINK-10629
> URL: https://issues.apache.org/jira/browse/FLINK-10629
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Piotr Nowojski
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.7.0
>
>
> The 
> {{FlinkKafkaProducerITCase.testFlinkKafkaProducer10FailTransactionCoordinatorBeforeNotify}}
>  failed on Travis.
> https://api.travis-ci.org/v3/job/443777257/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-10629) FlinkKafkaProducerITCase.testFlinkKafkaProducerFailTransactionCoordinatorBeforeNotify failed on Travis

2018-11-30 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reopened FLINK-10629:


> FlinkKafkaProducerITCase.testFlinkKafkaProducerFailTransactionCoordinatorBeforeNotify
>  failed on Travis
> --
>
> Key: FLINK-10629
> URL: https://issues.apache.org/jira/browse/FLINK-10629
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Piotr Nowojski
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.7.0
>
>
> The 
> {{FlinkKafkaProducerITCase.testFlinkKafkaProducer10FailTransactionCoordinatorBeforeNotify}}
>  failed on Travis.
> https://api.travis-ci.org/v3/job/443777257/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10837) Kafka 2.0 test KafkaITCase.testOneToOneSources dead locked on deleteTestTopic?

2018-11-30 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-10837.
--
   Resolution: Cannot Reproduce
Fix Version/s: 1.7.0

I haven't seen Kafka 2.0 test failure since upgrading Kafka to {{2.0.1}}. 
Closing this issue for now. Please re-open if you spot this error again.

> Kafka 2.0 test KafkaITCase.testOneToOneSources dead locked on deleteTestTopic?
> --
>
> Key: FLINK-10837
> URL: https://issues.apache.org/jira/browse/FLINK-10837
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.7.0
>Reporter: Piotr Nowojski
>Priority: Major
> Fix For: 1.7.0
>
>
> https://api.travis-ci.org/v3/job/452439034/log.txt
> {noformat}
> Tests in error: 
>   
> KafkaITCase.testOneToOneSources:97->KafkaConsumerTestBase.runOneToOneExactlyOnceTest:875->KafkaTestBase.deleteTestTopic:206->Object.wait:502->Object.wait:-2
>  » TestTimedOut
> {noformat}
> {noformat}
> java.lang.InterruptedException
>   at java.lang.Object.wait(Native Method)
>   at java.lang.Object.wait(Object.java:502)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:92)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.deleteTestTopic(KafkaTestEnvironmentImpl.java:158)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestBase.deleteTestTopic(KafkaTestBase.java:206)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runOneToOneExactlyOnceTest(KafkaConsumerTestBase.java:875)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaITCase.testOneToOneSources(KafkaITCase.java:97)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at java.lang.Thread.run(Thread.java:748)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10874) Kafka 2.0 connector testMigrateFromAtLeastOnceToExactlyOnce failure

2018-11-30 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-10874.
--
   Resolution: Cannot Reproduce
Fix Version/s: 1.7.0

I haven't seen Kafka 2.0 test failure since upgrading Kafka to {{2.0.1}}. 
Closing this issue for now. Please re-open if you spot this error again.

> Kafka 2.0 connector testMigrateFromAtLeastOnceToExactlyOnce failure
> ---
>
> Key: FLINK-10874
> URL: https://issues.apache.org/jira/browse/FLINK-10874
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.8.0
>Reporter: Piotr Nowojski
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> https://api.travis-ci.org/v3/job/454449444/log.txt
> {noformat}
> Test 
> testMigrateFromAtLeastOnceToExactlyOnce(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase)
>  is running.
> 
> 16:35:07,894 WARN  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer  - Property 
> [transaction.timeout.ms] not specified. Setting it to 360 ms
> 16:35:07,903 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer  - Starting 
> FlinkKafkaInternalProducer (1/1) to produce into default topic 
> testMigrateFromAtLeastOnceToExactlyOnce
> 16:35:08,785 ERROR 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase  - 
> 
> Test 
> testMigrateFromAtLeastOnceToExactlyOnce(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase)
>  failed with:
> java.lang.Exception: Could not complete snapshot 0 for operator MockTask 
> (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:419)
>   at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshotWithLocalState(AbstractStreamOperatorTestHarness.java:505)
>   at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:497)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testRecoverWithChangeSemantics(FlinkKafkaProducerITCase.java:591)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testMigrateFromAtLeastOnceToExactlyOnce(FlinkKafkaProducerITCase.java:569)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
>   at 
> org.apache.maven.surefire.

[jira] [Created] (FLINK-11038) Rewrite Kafka at-least-once it cases

2018-11-30 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-11038:
--

 Summary: Rewrite Kafka at-least-once it cases
 Key: FLINK-11038
 URL: https://issues.apache.org/jira/browse/FLINK-11038
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.7.0
Reporter: Piotr Nowojski


Currently they are using {{NetworkFailuresProxy}} which is unstable both for 
Kafka 0.11 in exactly once mode (in 50% tests are live locking) and for Kafka 
2.0.

Those tests should either be rewritten to SIGKILL Flink's process doing the 
writing. Either as an ITCase SIGKILL-ing task manager or test harness 
SIGKILL-ing/exiting test harness process.

We can not simply use test harness and do not close it to simulate failure, 
because we want to make sure that we have flushed the records during 
checkpoint. If we do not SIGKILL the process, the background Kafka client's 
threads can just send those records for us.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11038) Rewrite Kafka at-least-once it cases

2018-11-30 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-11038:
---
Description: 
Currently they are using {{NetworkFailuresProxy}} which is unstable both for 
Kafka 0.11 in exactly once mode (in 50% tests are live locking) and for Kafka 
2.0 (and because of that currently {{testOneToOneAtLeastOnceRegularSink}} and 
{{testOneToOneAtLeastOnceCustomOperator}} tests are disabled).

Those tests should either be rewritten to SIGKILL Flink's process doing the 
writing. Either as an ITCase SIGKILL-ing task manager or test harness 
SIGKILL-ing/exiting test harness process.

We can not simply use test harness and do not close it to simulate failure, 
because we want to make sure that we have flushed the records during 
checkpoint. If we do not SIGKILL the process, the background Kafka client's 
threads can just send those records for us.

  was:
Currently they are using {{NetworkFailuresProxy}} which is unstable both for 
Kafka 0.11 in exactly once mode (in 50% tests are live locking) and for Kafka 
2.0.

Those tests should either be rewritten to SIGKILL Flink's process doing the 
writing. Either as an ITCase SIGKILL-ing task manager or test harness 
SIGKILL-ing/exiting test harness process.

We can not simply use test harness and do not close it to simulate failure, 
because we want to make sure that we have flushed the records during 
checkpoint. If we do not SIGKILL the process, the background Kafka client's 
threads can just send those records for us.


> Rewrite Kafka at-least-once it cases
> 
>
> Key: FLINK-11038
> URL: https://issues.apache.org/jira/browse/FLINK-11038
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.7.0
>Reporter: Piotr Nowojski
>Priority: Major
>
> Currently they are using {{NetworkFailuresProxy}} which is unstable both for 
> Kafka 0.11 in exactly once mode (in 50% tests are live locking) and for Kafka 
> 2.0 (and because of that currently {{testOneToOneAtLeastOnceRegularSink}} and 
> {{testOneToOneAtLeastOnceCustomOperator}} tests are disabled).
> Those tests should either be rewritten to SIGKILL Flink's process doing the 
> writing. Either as an ITCase SIGKILL-ing task manager or test harness 
> SIGKILL-ing/exiting test harness process.
> We can not simply use test harness and do not close it to simulate failure, 
> because we want to make sure that we have flushed the records during 
> checkpoint. If we do not SIGKILL the process, the background Kafka client's 
> threads can just send those records for us.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10838) Rewrite Kafka tests that fail Kafka brokers

2018-11-30 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-10838.
--
   Resolution: Won't Fix
Fix Version/s: (was: 1.8.0)

After further investigation, we can not rewrite those tests that are failing 
kafka brokers, because their whole point is to test the behaviour when Kafka 
brokers are failing. We already have very similar tests that are simply not 
closing test harness to simulate Flink's failure.

Rewriting tests that are using network failure proxy is still a valid thing and 
was extracted to separate issue 
https://issues.apache.org/jira/browse/FLINK-11038

> Rewrite Kafka tests that fail Kafka brokers
> ---
>
> Key: FLINK-10838
> URL: https://issues.apache.org/jira/browse/FLINK-10838
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.7.0
>Reporter: Piotr Nowojski
>Priority: Critical
>
> Currently we have many tests that in order to test for example 
> `at-least-once`, they fail Kafka brokers, or break network connection to 
> them. It seems that those tests are more testing Kafka brokers then our own 
> code and in the process (because of bugs in Kafka) cause lots of false 
> positive results.
> We could try to rewrite those tests to spawn one of the Flink's Task Managers 
> in separate process and instead of failing network/kafka broker we could 
> SIGKILL Task Manager, without touching/affecting Zookeeper/Kafka brokers. It 
> should improve stability of our tests.
> CC [~till.rohrmann] [~aljoscha]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11042) testFlinkKafkaProducerFailTransactionCoordinatorBeforeNotify is an invalid test

2018-11-30 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-11042:
--

 Summary: 
testFlinkKafkaProducerFailTransactionCoordinatorBeforeNotify is an invalid test
 Key: FLINK-11042
 URL: https://issues.apache.org/jira/browse/FLINK-11042
 Project: Flink
  Issue Type: Bug
  Components: Kinesis Connector
Affects Versions: 1.8.0
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski


main point of testFlinkKafkaProducerFailTransactionCoordinatorBeforeNotify is 
to fail transaction coordinator (by using 
{{kafkaProducer.getTransactionCoordinatorId();}} ) and we expect that this will 
cause failure of Flink job. However that's not always the case. Maybe because 
transaction coordinator can be re-elected before {{KafkaProducer}} even notices 
it or for whatever the reason, sometimes the failure is not happening.

Because of a bug in the test, if failure hasn't happened, the test will not 
fail.

Generally speaking this test is invalid and should be dropped.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10874) Kafka 2.0 connector testMigrateFromAtLeastOnceToExactlyOnce failure

2018-12-03 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16707160#comment-16707160
 ] 

Piotr Nowojski commented on FLINK-10874:


merged commit 59ebdb2 into apache:master
merged commit 7b23bf69e8 into apache:release-1.7

> Kafka 2.0 connector testMigrateFromAtLeastOnceToExactlyOnce failure
> ---
>
> Key: FLINK-10874
> URL: https://issues.apache.org/jira/browse/FLINK-10874
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.8.0
>Reporter: Piotr Nowojski
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> https://api.travis-ci.org/v3/job/454449444/log.txt
> {noformat}
> Test 
> testMigrateFromAtLeastOnceToExactlyOnce(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase)
>  is running.
> 
> 16:35:07,894 WARN  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer  - Property 
> [transaction.timeout.ms] not specified. Setting it to 360 ms
> 16:35:07,903 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer  - Starting 
> FlinkKafkaInternalProducer (1/1) to produce into default topic 
> testMigrateFromAtLeastOnceToExactlyOnce
> 16:35:08,785 ERROR 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase  - 
> 
> Test 
> testMigrateFromAtLeastOnceToExactlyOnce(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase)
>  failed with:
> java.lang.Exception: Could not complete snapshot 0 for operator MockTask 
> (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:419)
>   at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshotWithLocalState(AbstractStreamOperatorTestHarness.java:505)
>   at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:497)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testRecoverWithChangeSemantics(FlinkKafkaProducerITCase.java:591)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testMigrateFromAtLeastOnceToExactlyOnce(FlinkKafkaProducerITCase.java:569)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153

[jira] [Assigned] (FLINK-35351) Restore from unaligned checkpoints with a custom partitioner fails.

2024-05-14 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-35351:
--

Assignee: Dmitriy Linevich

> Restore from unaligned checkpoints with a custom partitioner fails.
> ---
>
> Key: FLINK-35351
> URL: https://issues.apache.org/jira/browse/FLINK-35351
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Reporter: Dmitriy Linevich
>Assignee: Dmitriy Linevich
>Priority: Major
>
> We encountered a problem when using a custom partitioner with unaligned 
> checkpoints. The bug reproduces under the following steps:
>  # Run a job with graph: Source[2]->Sink[3], the custom partitioner applied 
> after the Source task.
>  # Make a checkpoint.
>  # Restore from the checkpoint with a different source parallelism: 
> Source[1]->Sink[3].
>  # An exception is thrown.
> This issue does not occur when restoring with the same parallelism or when 
> changing the Sink parallelism. The exception only occurs when the parallelism 
> of the Source is changed while the Sink parallelism remains the same.
> See the exception below and the test code at the end. 
> {code:java}
> [db13789c52b80aad852c53a0afa26247] Task [Sink: sink (3/3)#0] WARN  Sink: sink 
> (3/3)#0 
> (be1d158c2e77fc9ed9e3e5d9a8431dc2_0a448493b4782967b150582570326227_2_0) 
> switched from RUNNING to FAILED with failure cause:
> java.io.IOException: Can't get next record for channel 
> InputChannelInfo{gateIdx=0, inputChannelIdx=0}
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:106)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:600)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:930)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:879)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:960)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) 
> [classes/:?]
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:753) 
> [classes/:?]
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568) 
> [classes/:?]
>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
> Caused by: java.io.IOException: Corrupt stream, found tag: -1
>     at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:222)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:44)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:128)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:103)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer$VirtualChannel.getNextRecord(DemultiplexingRecordDeserializer.java:79)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:154)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:54)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:103)
>  ~[classes/:?]
>     ... 10 more {code}
> We discovered that this issue occurs due to an optimization in the 
> [StateAssignmentOperation::reDistributeInputChannelStates|https://github.com/ap

[jira] [Commented] (FLINK-35351) Restore from unaligned checkpoints with a custom partitioner fails.

2024-05-14 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846231#comment-17846231
 ] 

Piotr Nowojski commented on FLINK-35351:


Hi [~lda-dima], thanks for reporting this issue and what looks like correct 
analysis. However I think the proposed solution goes in the wrong direction - 
we can not disallow rescaling, as that might prevent jobs from recovering in 
case of for example a loss of a TaskManager. Note that the problem probably 
happens due to the use of " custom partitioner" here. AFAIR unaligned 
checkpoints are (for most likely the exactly same reason that you discovered in 
this bug) enabled only for keyed exchanges. [Pointwise/broadcast connections 
with unaligned checkpoints are 
unsupported|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#certain-data-distribution-patterns-are-not-checkpointed].
 There is somewhere in the codebase code, that checks whether for given 
partitioning unaligned checkpoints should be enabled or not. I would guess that 
custom partitioners are mishandled there?  

> Restore from unaligned checkpoints with a custom partitioner fails.
> ---
>
> Key: FLINK-35351
> URL: https://issues.apache.org/jira/browse/FLINK-35351
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Reporter: Dmitriy Linevich
>Assignee: Dmitriy Linevich
>Priority: Major
>
> We encountered a problem when using a custom partitioner with unaligned 
> checkpoints. The bug reproduces under the following steps:
>  # Run a job with graph: Source[2]->Sink[3], the custom partitioner applied 
> after the Source task.
>  # Make a checkpoint.
>  # Restore from the checkpoint with a different source parallelism: 
> Source[1]->Sink[3].
>  # An exception is thrown.
> This issue does not occur when restoring with the same parallelism or when 
> changing the Sink parallelism. The exception only occurs when the parallelism 
> of the Source is changed while the Sink parallelism remains the same.
> See the exception below and the test code at the end. 
> {code:java}
> [db13789c52b80aad852c53a0afa26247] Task [Sink: sink (3/3)#0] WARN  Sink: sink 
> (3/3)#0 
> (be1d158c2e77fc9ed9e3e5d9a8431dc2_0a448493b4782967b150582570326227_2_0) 
> switched from RUNNING to FAILED with failure cause:
> java.io.IOException: Can't get next record for channel 
> InputChannelInfo{gateIdx=0, inputChannelIdx=0}
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:106)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:600)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:930)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:879)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:960)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) 
> [classes/:?]
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:753) 
> [classes/:?]
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568) 
> [classes/:?]
>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
> Caused by: java.io.IOException: Corrupt stream, found tag: -1
>     at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:222)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:44)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:128)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:103)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.io.network.api.serializatio

[jira] [Comment Edited] (FLINK-35351) Restore from unaligned checkpoints with a custom partitioner fails.

2024-05-14 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846231#comment-17846231
 ] 

Piotr Nowojski edited comment on FLINK-35351 at 5/14/24 9:11 AM:
-

Hi [~lda-dima], thanks for reporting this issue and what looks like correct 
analysis. However I think the proposed solution goes in the wrong direction - 
we can not disallow rescaling, as that might prevent jobs from recovering in 
case of for example a loss of a TaskManager. Note that the problem probably 
happens due to the use of " custom partitioner" here. AFAIR unaligned 
checkpoints are (for most likely the exactly same reason that you discovered in 
this bug) enabled only for keyed exchanges. [Pointwise/broadcast connections 
with unaligned checkpoints are 
unsupported|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#certain-data-distribution-patterns-are-not-checkpointed].
 There is somewhere in the codebase code, that checks whether for given 
partitioning unaligned checkpoints should be enabled or not. I would guess that 
custom partitioners are mishandled there?  

Relevant code pointers:
* 
{{org.apache.flink.runtime.checkpoint.CheckpointOptions.AlignmentType#FORCED_ALIGNED}}
* 
{{org.apache.flink.streaming.runtime.io.RecordWriterOutput#supportsUnalignedCheckpoints}}
* 
{{org.apache.flink.streaming.api.graph.StreamGraphGenerator#shouldDisableUnalignedCheckpointing}}



was (Author: pnowojski):
Hi [~lda-dima], thanks for reporting this issue and what looks like correct 
analysis. However I think the proposed solution goes in the wrong direction - 
we can not disallow rescaling, as that might prevent jobs from recovering in 
case of for example a loss of a TaskManager. Note that the problem probably 
happens due to the use of " custom partitioner" here. AFAIR unaligned 
checkpoints are (for most likely the exactly same reason that you discovered in 
this bug) enabled only for keyed exchanges. [Pointwise/broadcast connections 
with unaligned checkpoints are 
unsupported|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#certain-data-distribution-patterns-are-not-checkpointed].
 There is somewhere in the codebase code, that checks whether for given 
partitioning unaligned checkpoints should be enabled or not. I would guess that 
custom partitioners are mishandled there?  

> Restore from unaligned checkpoints with a custom partitioner fails.
> ---
>
> Key: FLINK-35351
> URL: https://issues.apache.org/jira/browse/FLINK-35351
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Reporter: Dmitriy Linevich
>Assignee: Dmitriy Linevich
>Priority: Major
>
> We encountered a problem when using a custom partitioner with unaligned 
> checkpoints. The bug reproduces under the following steps:
>  # Run a job with graph: Source[2]->Sink[3], the custom partitioner applied 
> after the Source task.
>  # Make a checkpoint.
>  # Restore from the checkpoint with a different source parallelism: 
> Source[1]->Sink[3].
>  # An exception is thrown.
> This issue does not occur when restoring with the same parallelism or when 
> changing the Sink parallelism. The exception only occurs when the parallelism 
> of the Source is changed while the Sink parallelism remains the same.
> See the exception below and the test code at the end. 
> {code:java}
> [db13789c52b80aad852c53a0afa26247] Task [Sink: sink (3/3)#0] WARN  Sink: sink 
> (3/3)#0 
> (be1d158c2e77fc9ed9e3e5d9a8431dc2_0a448493b4782967b150582570326227_2_0) 
> switched from RUNNING to FAILED with failure cause:
> java.io.IOException: Can't get next record for channel 
> InputChannelInfo{gateIdx=0, inputChannelIdx=0}
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:106)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:600)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:930)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:879)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:960)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) 
> [classes/:?]
>     a

[jira] [Commented] (FLINK-33109) Watermark alignment not applied after recovery from checkpoint

2024-05-15 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846597#comment-17846597
 ] 

Piotr Nowojski commented on FLINK-33109:


Hi!

{quote}
I guess this bug has been fixed in 1.17.2 and 1.18.
{quote}

Has this been confirmed?  [~YordanPavlov], has this issue been fixed for you?

> Watermark alignment not applied after recovery from checkpoint
> --
>
> Key: FLINK-33109
> URL: https://issues.apache.org/jira/browse/FLINK-33109
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.17.1
>Reporter: Yordan Pavlov
>Priority: Major
> Attachments: WatermarkTest-1.scala, 
> image-2023-09-18-15-40-06-868.png, image-2023-09-18-15-46-16-106.png
>
>
> I am observing a problem where after recovery from a checkpoint the Kafka 
> source watermarks would start to diverge not honoring the watermark alignment 
> setting I have applied.
> I have a Kafka source which reads a topic with 32 partitions. I am applying 
> the following watermark strategy:
> {code:java}
> new EventAwareWatermarkStrategy[KeyedKafkaSourceMessage]](msg => 
> msg.value.getTimestamp)
>       .withWatermarkAlignment("alignment-sources-group", 
> time.Duration.ofMillis(sourceWatermarkAlignmentBlocks)){code}
>  
> This works great up until my job needs to recover from checkpoint. Once the 
> recovery takes place, no alignment is taking place any more. This can best be 
> illustrated by looking at the watermark metrics for various operators in the 
> image:
> !image-2023-09-18-15-40-06-868.png!
>  
> You can see how the watermarks disperse after the recovery. Trying to debug 
> the problem I noticed that before the failure there would be calls in
>  
> {code:java}
> SourceCoordinator::announceCombinedWatermark() 
> {code}
> after the recovery, no calls get there, so no value for 
> {code:java}
> watermarkAlignmentParams.getMaxAllowedWatermarkDrift(){code}
> is ever read. I can manually fix the problem If I stop the job, clear all 
> state from Zookeeper and then manually start Flink providing the last 
> checkpoint with 
> {code:java}
> '–fromSavepoint'{code}
>  flag. This would cause the SourceCoordinator to be constructed properly and 
> watermark drift to be checked. Once recovery manually watermarks would again 
> converge to the allowed drift as seen in the metrics:
> !image-2023-09-18-15-46-16-106.png!
>  
> Let me know If I can be helpful by providing any more information.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32828) Kafka partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint

2024-05-15 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-32828:
--

Assignee: Piotr Nowojski

> Kafka partition aware watermark not handled correctly shortly after job start 
> up from checkpoint or savepoint
> -
>
> Key: FLINK-32828
> URL: https://issues.apache.org/jira/browse/FLINK-32828
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.17.1
> Environment: Affected environments:
>  * Local MiniCluster + Confluent Kafka run in docker
>  ** See attached files
>  * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka 
> cluster run in Kubernetes cluster
>Reporter: Grzegorz Liter
>Assignee: Piotr Nowojski
>Priority: Major
> Attachments: docker-compose.yml, test-job.java
>
>
> When using KafkaSource with partition aware watermarks. Watermarks are being 
> emitted even when only one partition has some events just after job startup 
> from savepoint/checkpoint. After it has some events on other partitions the 
> watermark behaviour is correct and watermark is emited as a minimum watarmark 
> from all partition.
>  
> Steps to reproduce:
>  # Setup a Kafka cluster with a topic that has 2 or more partitions. (see 
> attached docker-compose.yml)
>  # 
>  ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
> test-2 --partitions 4}}
>  # Create a job that (see attached `test-job.java`):
>  ## uses a KafkaSource with 
> `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
>  ## has parallelism lower than number of partitions
>  ## stores checkpoint/savepoint
>  # Start job
>  # Send events only on single partition
>  ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
> test-2 --property "parse.key=true" --property "key.separator=:"}}
>  
> {{14:51:19,883 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:32,484 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:35,914 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> Expected: Watermark does not progress. Actual: Watermark does not progress.
> 5. Stop the job
> 6. Startup job from last checkpoint/savepoint
> 7. Send events only on single partitions
> {{14:53:41,693 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:53:46,088 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark 
> 2023-08-10T12:53:30.661Z}}
> {{14:53:49,520 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark 
> 2023-08-10T12:53:35.077Z}}
> Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has 
> progress*{color}
>  
> {color:#172b4d}To add bit more of context:{color}
> {color:#172b4d}8. Send events on other partitions and then send events only 
> on single partitions{color}
> {{{color:#172b4d}14:54:55,112 WARN  com.example.TestJob6$InputSink2           
>         [] - == Received: test-2/0: 2 -> a, timestamp 
> 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:55:12,821 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:11.814Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:16,099 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:15.091Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:19,122 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:18.114Z, watermark 
> 2023-08-10T12:54:44.103Z{color}}}
> {color:#172b4d}Expected: Watermark should progress a bit and then should not 
> progress when receivi

[jira] [Updated] (FLINK-32828) Kafka partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint

2024-05-15 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-32828:
---
Affects Version/s: 1.18.1
   1.19.0

> Kafka partition aware watermark not handled correctly shortly after job start 
> up from checkpoint or savepoint
> -
>
> Key: FLINK-32828
> URL: https://issues.apache.org/jira/browse/FLINK-32828
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.17.1, 1.19.0, 1.18.1
> Environment: Affected environments:
>  * Local MiniCluster + Confluent Kafka run in docker
>  ** See attached files
>  * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka 
> cluster run in Kubernetes cluster
>Reporter: Grzegorz Liter
>Assignee: Piotr Nowojski
>Priority: Critical
> Attachments: docker-compose.yml, test-job.java
>
>
> When using KafkaSource with partition aware watermarks. Watermarks are being 
> emitted even when only one partition has some events just after job startup 
> from savepoint/checkpoint. After it has some events on other partitions the 
> watermark behaviour is correct and watermark is emited as a minimum watarmark 
> from all partition.
>  
> Steps to reproduce:
>  # Setup a Kafka cluster with a topic that has 2 or more partitions. (see 
> attached docker-compose.yml)
>  # 
>  ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
> test-2 --partitions 4}}
>  # Create a job that (see attached `test-job.java`):
>  ## uses a KafkaSource with 
> `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
>  ## has parallelism lower than number of partitions
>  ## stores checkpoint/savepoint
>  # Start job
>  # Send events only on single partition
>  ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
> test-2 --property "parse.key=true" --property "key.separator=:"}}
>  
> {{14:51:19,883 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:32,484 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:35,914 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> Expected: Watermark does not progress. Actual: Watermark does not progress.
> 5. Stop the job
> 6. Startup job from last checkpoint/savepoint
> 7. Send events only on single partitions
> {{14:53:41,693 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:53:46,088 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark 
> 2023-08-10T12:53:30.661Z}}
> {{14:53:49,520 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark 
> 2023-08-10T12:53:35.077Z}}
> Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has 
> progress*{color}
>  
> {color:#172b4d}To add bit more of context:{color}
> {color:#172b4d}8. Send events on other partitions and then send events only 
> on single partitions{color}
> {{{color:#172b4d}14:54:55,112 WARN  com.example.TestJob6$InputSink2           
>         [] - == Received: test-2/0: 2 -> a, timestamp 
> 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:55:12,821 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:11.814Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:16,099 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:15.091Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:19,122 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:18.114Z, watermark 
> 2023-08-10T12:54:44.103Z{color}}}
> {color:#172b4d}Expected: Watermark should progress a bit a

[jira] [Updated] (FLINK-32828) Kafka partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint

2024-05-15 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-32828:
---
Priority: Critical  (was: Major)

> Kafka partition aware watermark not handled correctly shortly after job start 
> up from checkpoint or savepoint
> -
>
> Key: FLINK-32828
> URL: https://issues.apache.org/jira/browse/FLINK-32828
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.17.1
> Environment: Affected environments:
>  * Local MiniCluster + Confluent Kafka run in docker
>  ** See attached files
>  * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka 
> cluster run in Kubernetes cluster
>Reporter: Grzegorz Liter
>Assignee: Piotr Nowojski
>Priority: Critical
> Attachments: docker-compose.yml, test-job.java
>
>
> When using KafkaSource with partition aware watermarks. Watermarks are being 
> emitted even when only one partition has some events just after job startup 
> from savepoint/checkpoint. After it has some events on other partitions the 
> watermark behaviour is correct and watermark is emited as a minimum watarmark 
> from all partition.
>  
> Steps to reproduce:
>  # Setup a Kafka cluster with a topic that has 2 or more partitions. (see 
> attached docker-compose.yml)
>  # 
>  ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
> test-2 --partitions 4}}
>  # Create a job that (see attached `test-job.java`):
>  ## uses a KafkaSource with 
> `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
>  ## has parallelism lower than number of partitions
>  ## stores checkpoint/savepoint
>  # Start job
>  # Send events only on single partition
>  ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
> test-2 --property "parse.key=true" --property "key.separator=:"}}
>  
> {{14:51:19,883 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:32,484 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:35,914 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> Expected: Watermark does not progress. Actual: Watermark does not progress.
> 5. Stop the job
> 6. Startup job from last checkpoint/savepoint
> 7. Send events only on single partitions
> {{14:53:41,693 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:53:46,088 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark 
> 2023-08-10T12:53:30.661Z}}
> {{14:53:49,520 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark 
> 2023-08-10T12:53:35.077Z}}
> Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has 
> progress*{color}
>  
> {color:#172b4d}To add bit more of context:{color}
> {color:#172b4d}8. Send events on other partitions and then send events only 
> on single partitions{color}
> {{{color:#172b4d}14:54:55,112 WARN  com.example.TestJob6$InputSink2           
>         [] - == Received: test-2/0: 2 -> a, timestamp 
> 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:55:12,821 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:11.814Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:16,099 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:15.091Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:19,122 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:18.114Z, watermark 
> 2023-08-10T12:54:44.103Z{color}}}
> {color:#172b4d}Expected: Watermark should progress a bit and then should not 
> progress when rec

[jira] [Commented] (FLINK-32828) Kafka partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint

2024-05-15 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846694#comment-17846694
 ] 

Piotr Nowojski commented on FLINK-32828:


We have just stumbled upon the same issue and I can confirm that it's quite 
critical bug that can leads to all kinds of incorrect results. 

The problem is that the initial splits recovered from state are initial are 
registered in the {{WatermarkOutputMultiplexer}} only when first record from 
that split has been emitted. Resulting watermark was not properly combined from 
the initial splits, but only from the splits that have already emitted at least 
one record.

I will publish a fix for that shortly.

> Kafka partition aware watermark not handled correctly shortly after job start 
> up from checkpoint or savepoint
> -
>
> Key: FLINK-32828
> URL: https://issues.apache.org/jira/browse/FLINK-32828
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.17.1, 1.19.0, 1.18.1
> Environment: Affected environments:
>  * Local MiniCluster + Confluent Kafka run in docker
>  ** See attached files
>  * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka 
> cluster run in Kubernetes cluster
>Reporter: Grzegorz Liter
>Assignee: Piotr Nowojski
>Priority: Critical
> Attachments: docker-compose.yml, test-job.java
>
>
> When using KafkaSource with partition aware watermarks. Watermarks are being 
> emitted even when only one partition has some events just after job startup 
> from savepoint/checkpoint. After it has some events on other partitions the 
> watermark behaviour is correct and watermark is emited as a minimum watarmark 
> from all partition.
>  
> Steps to reproduce:
>  # Setup a Kafka cluster with a topic that has 2 or more partitions. (see 
> attached docker-compose.yml)
>  # 
>  ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
> test-2 --partitions 4}}
>  # Create a job that (see attached `test-job.java`):
>  ## uses a KafkaSource with 
> `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
>  ## has parallelism lower than number of partitions
>  ## stores checkpoint/savepoint
>  # Start job
>  # Send events only on single partition
>  ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
> test-2 --property "parse.key=true" --property "key.separator=:"}}
>  
> {{14:51:19,883 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:32,484 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:35,914 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> Expected: Watermark does not progress. Actual: Watermark does not progress.
> 5. Stop the job
> 6. Startup job from last checkpoint/savepoint
> 7. Send events only on single partitions
> {{14:53:41,693 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:53:46,088 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark 
> 2023-08-10T12:53:30.661Z}}
> {{14:53:49,520 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark 
> 2023-08-10T12:53:35.077Z}}
> Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has 
> progress*{color}
>  
> {color:#172b4d}To add bit more of context:{color}
> {color:#172b4d}8. Send events on other partitions and then send events only 
> on single partitions{color}
> {{{color:#172b4d}14:54:55,112 WARN  com.example.TestJob6$InputSink2           
>         [] - == Received: test-2/0: 2 -> a, timestamp 
> 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:55:12,821 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, ti

[jira] [Comment Edited] (FLINK-32828) Kafka partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint

2024-05-15 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846694#comment-17846694
 ] 

Piotr Nowojski edited comment on FLINK-32828 at 5/15/24 4:27 PM:
-

We have just stumbled upon the same issue and I can confirm that it's quite 
critical bug that can leads to all kinds of incorrect results. 

The problem is that the initial splits recovered from state are initial are 
registered in the {{WatermarkOutputMultiplexer}} only when first record from 
that split has been emitted. Resulting watermark was not properly combined from 
the initial splits, but only from the splits that have already emitted at least 
one record.

I will publish a fix for that shortly.

edit: Also the problem doesn't affect only Kafka, but all FLIP-27 sources 
(anything that uses {{SourceOperator}}.


was (Author: pnowojski):
We have just stumbled upon the same issue and I can confirm that it's quite 
critical bug that can leads to all kinds of incorrect results. 

The problem is that the initial splits recovered from state are initial are 
registered in the {{WatermarkOutputMultiplexer}} only when first record from 
that split has been emitted. Resulting watermark was not properly combined from 
the initial splits, but only from the splits that have already emitted at least 
one record.

I will publish a fix for that shortly.

> Kafka partition aware watermark not handled correctly shortly after job start 
> up from checkpoint or savepoint
> -
>
> Key: FLINK-32828
> URL: https://issues.apache.org/jira/browse/FLINK-32828
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.17.1, 1.19.0, 1.18.1
> Environment: Affected environments:
>  * Local MiniCluster + Confluent Kafka run in docker
>  ** See attached files
>  * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka 
> cluster run in Kubernetes cluster
>Reporter: Grzegorz Liter
>Assignee: Piotr Nowojski
>Priority: Critical
> Attachments: docker-compose.yml, test-job.java
>
>
> When using KafkaSource with partition aware watermarks. Watermarks are being 
> emitted even when only one partition has some events just after job startup 
> from savepoint/checkpoint. After it has some events on other partitions the 
> watermark behaviour is correct and watermark is emited as a minimum watarmark 
> from all partition.
>  
> Steps to reproduce:
>  # Setup a Kafka cluster with a topic that has 2 or more partitions. (see 
> attached docker-compose.yml)
>  # 
>  ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
> test-2 --partitions 4}}
>  # Create a job that (see attached `test-job.java`):
>  ## uses a KafkaSource with 
> `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
>  ## has parallelism lower than number of partitions
>  ## stores checkpoint/savepoint
>  # Start job
>  # Send events only on single partition
>  ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
> test-2 --property "parse.key=true" --property "key.separator=:"}}
>  
> {{14:51:19,883 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:32,484 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:35,914 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> Expected: Watermark does not progress. Actual: Watermark does not progress.
> 5. Stop the job
> 6. Startup job from last checkpoint/savepoint
> 7. Send events only on single partitions
> {{14:53:41,693 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:53:46,088 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark 
> 2023-08-10T12:53:30.661Z}}
> {{14:53:49,520 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark 
> 2023-08-10T12:53:35.077Z}}
> Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has 
> progress*{color}
>  
> {color:#172b4d}To add bit more of context:{color}
> {color:#172b4d}8. Send events on other partitions and then send events only 
> on sin

[jira] [Updated] (FLINK-32828) Partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint

2024-05-15 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-32828:
---
Summary: Partition aware watermark not handled correctly shortly after job 
start up from checkpoint or savepoint  (was: Kafka partition aware watermark 
not handled correctly shortly after job start up from checkpoint or savepoint)

> Partition aware watermark not handled correctly shortly after job start up 
> from checkpoint or savepoint
> ---
>
> Key: FLINK-32828
> URL: https://issues.apache.org/jira/browse/FLINK-32828
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.17.1, 1.19.0, 1.18.1
> Environment: Affected environments:
>  * Local MiniCluster + Confluent Kafka run in docker
>  ** See attached files
>  * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka 
> cluster run in Kubernetes cluster
>Reporter: Grzegorz Liter
>Assignee: Piotr Nowojski
>Priority: Critical
> Attachments: docker-compose.yml, test-job.java
>
>
> When using KafkaSource with partition aware watermarks. Watermarks are being 
> emitted even when only one partition has some events just after job startup 
> from savepoint/checkpoint. After it has some events on other partitions the 
> watermark behaviour is correct and watermark is emited as a minimum watarmark 
> from all partition.
>  
> Steps to reproduce:
>  # Setup a Kafka cluster with a topic that has 2 or more partitions. (see 
> attached docker-compose.yml)
>  # 
>  ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
> test-2 --partitions 4}}
>  # Create a job that (see attached `test-job.java`):
>  ## uses a KafkaSource with 
> `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
>  ## has parallelism lower than number of partitions
>  ## stores checkpoint/savepoint
>  # Start job
>  # Send events only on single partition
>  ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
> test-2 --property "parse.key=true" --property "key.separator=:"}}
>  
> {{14:51:19,883 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:32,484 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:35,914 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> Expected: Watermark does not progress. Actual: Watermark does not progress.
> 5. Stop the job
> 6. Startup job from last checkpoint/savepoint
> 7. Send events only on single partitions
> {{14:53:41,693 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:53:46,088 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark 
> 2023-08-10T12:53:30.661Z}}
> {{14:53:49,520 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark 
> 2023-08-10T12:53:35.077Z}}
> Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has 
> progress*{color}
>  
> {color:#172b4d}To add bit more of context:{color}
> {color:#172b4d}8. Send events on other partitions and then send events only 
> on single partitions{color}
> {{{color:#172b4d}14:54:55,112 WARN  com.example.TestJob6$InputSink2           
>         [] - == Received: test-2/0: 2 -> a, timestamp 
> 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:55:12,821 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:11.814Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:16,099 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:15.091Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:19,122 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Receiv

[jira] [Commented] (FLINK-35351) Restore from unaligned checkpoints with a custom partitioner fails.

2024-05-16 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846869#comment-17846869
 ] 

Piotr Nowojski commented on FLINK-35351:


Thanks, after reading the PR indeed I understand this more :)

> Restore from unaligned checkpoints with a custom partitioner fails.
> ---
>
> Key: FLINK-35351
> URL: https://issues.apache.org/jira/browse/FLINK-35351
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Reporter: Dmitriy Linevich
>Assignee: Dmitriy Linevich
>Priority: Major
>  Labels: pull-request-available
>
> We encountered a problem when using a custom partitioner with unaligned 
> checkpoints. The bug reproduces under the following steps:
>  # Run a job with graph: Source[2]->Sink[3], the custom partitioner applied 
> after the Source task.
>  # Make a checkpoint.
>  # Restore from the checkpoint with a different source parallelism: 
> Source[1]->Sink[3].
>  # An exception is thrown.
> This issue does not occur when restoring with the same parallelism or when 
> changing the Sink parallelism. The exception only occurs when the parallelism 
> of the Source is changed while the Sink parallelism remains the same.
> See the exception below and the test code at the end. 
> {code:java}
> [db13789c52b80aad852c53a0afa26247] Task [Sink: sink (3/3)#0] WARN  Sink: sink 
> (3/3)#0 
> (be1d158c2e77fc9ed9e3e5d9a8431dc2_0a448493b4782967b150582570326227_2_0) 
> switched from RUNNING to FAILED with failure cause:
> java.io.IOException: Can't get next record for channel 
> InputChannelInfo{gateIdx=0, inputChannelIdx=0}
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:106)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:600)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:930)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:879)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:960)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) 
> [classes/:?]
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:753) 
> [classes/:?]
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568) 
> [classes/:?]
>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
> Caused by: java.io.IOException: Corrupt stream, found tag: -1
>     at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:222)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:44)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:128)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:103)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer$VirtualChannel.getNextRecord(DemultiplexingRecordDeserializer.java:79)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:154)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:54)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:103)
>  ~[classes/:?]
>     ... 10 more {code}
> We discovered that th

[jira] [Closed] (FLINK-32828) Partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint

2024-05-17 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-32828.
--
Fix Version/s: 2.0.0
   1.18.2
   1.20.0
   1.19.1
   Resolution: Fixed

Merged to master as 7fd6c6da26c
Merged to release-1.19 as a160f8db1f7
Merged to release-1.18 as 4848a50d735

> Partition aware watermark not handled correctly shortly after job start up 
> from checkpoint or savepoint
> ---
>
> Key: FLINK-32828
> URL: https://issues.apache.org/jira/browse/FLINK-32828
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.17.1, 1.19.0, 1.18.1
> Environment: Affected environments:
>  * Local MiniCluster + Confluent Kafka run in docker
>  ** See attached files
>  * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka 
> cluster run in Kubernetes cluster
>Reporter: Grzegorz Liter
>Assignee: Piotr Nowojski
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 2.0.0, 1.18.2, 1.20.0, 1.19.1
>
> Attachments: docker-compose.yml, test-job.java
>
>
> When using KafkaSource with partition aware watermarks. Watermarks are being 
> emitted even when only one partition has some events just after job startup 
> from savepoint/checkpoint. After it has some events on other partitions the 
> watermark behaviour is correct and watermark is emited as a minimum watarmark 
> from all partition.
>  
> Steps to reproduce:
>  # Setup a Kafka cluster with a topic that has 2 or more partitions. (see 
> attached docker-compose.yml)
>  # 
>  ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
> test-2 --partitions 4}}
>  # Create a job that (see attached `test-job.java`):
>  ## uses a KafkaSource with 
> `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
>  ## has parallelism lower than number of partitions
>  ## stores checkpoint/savepoint
>  # Start job
>  # Send events only on single partition
>  ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
> test-2 --property "parse.key=true" --property "key.separator=:"}}
>  
> {{14:51:19,883 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:32,484 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:35,914 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> Expected: Watermark does not progress. Actual: Watermark does not progress.
> 5. Stop the job
> 6. Startup job from last checkpoint/savepoint
> 7. Send events only on single partitions
> {{14:53:41,693 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:53:46,088 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark 
> 2023-08-10T12:53:30.661Z}}
> {{14:53:49,520 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark 
> 2023-08-10T12:53:35.077Z}}
> Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has 
> progress*{color}
>  
> {color:#172b4d}To add bit more of context:{color}
> {color:#172b4d}8. Send events on other partitions and then send events only 
> on single partitions{color}
> {{{color:#172b4d}14:54:55,112 WARN  com.example.TestJob6$InputSink2           
>         [] - == Received: test-2/0: 2 -> a, timestamp 
> 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:55:12,821 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:11.814Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:16,099 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:15.091Z, watermark 
> 2023-08-10T12:

[jira] [Created] (FLINK-35420) WordCountMapredITCase fails to compile in IntelliJ

2024-05-22 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-35420:
--

 Summary: WordCountMapredITCase fails to compile in IntelliJ
 Key: FLINK-35420
 URL: https://issues.apache.org/jira/browse/FLINK-35420
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hadoop Compatibility
Affects Versions: 1.20.0
Reporter: Piotr Nowojski


{noformat}
flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapredITCase.scala:42:8
value isFalse is not a member of ?0
possible cause: maybe a semicolon is missing before `value isFalse'?
  .isFalse()
{noformat}

Might be caused by:
https://youtrack.jetbrains.com/issue/SCL-20679




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35420) WordCountMapredITCase fails to compile in IntelliJ

2024-05-23 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-35420.
--
Resolution: Fixed

 9ccfb65 into apache:master now

> WordCountMapredITCase fails to compile in IntelliJ
> --
>
> Key: FLINK-35420
> URL: https://issues.apache.org/jira/browse/FLINK-35420
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hadoop Compatibility
>Affects Versions: 1.20.0
>Reporter: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> {noformat}
> flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapredITCase.scala:42:8
> value isFalse is not a member of ?0
> possible cause: maybe a semicolon is missing before `value isFalse'?
>   .isFalse()
> {noformat}
> Might be caused by:
> https://youtrack.jetbrains.com/issue/SCL-20679



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35420) WordCountMapredITCase fails to compile in IntelliJ

2024-05-23 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-35420:
--

Assignee: Piotr Nowojski

> WordCountMapredITCase fails to compile in IntelliJ
> --
>
> Key: FLINK-35420
> URL: https://issues.apache.org/jira/browse/FLINK-35420
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hadoop Compatibility
>Affects Versions: 1.20.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> {noformat}
> flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapredITCase.scala:42:8
> value isFalse is not a member of ?0
> possible cause: maybe a semicolon is missing before `value isFalse'?
>   .isFalse()
> {noformat}
> Might be caused by:
> https://youtrack.jetbrains.com/issue/SCL-20679



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35420) WordCountMapredITCase fails to compile in IntelliJ

2024-05-23 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-35420:
---
Fix Version/s: 1.20.0

> WordCountMapredITCase fails to compile in IntelliJ
> --
>
> Key: FLINK-35420
> URL: https://issues.apache.org/jira/browse/FLINK-35420
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hadoop Compatibility
>Affects Versions: 1.20.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> {noformat}
> flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapredITCase.scala:42:8
> value isFalse is not a member of ?0
> possible cause: maybe a semicolon is missing before `value isFalse'?
>   .isFalse()
> {noformat}
> Might be caused by:
> https://youtrack.jetbrains.com/issue/SCL-20679



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35351) Restore from unaligned checkpoints with a custom partitioner fails.

2024-05-31 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-35351.
--
Fix Version/s: 1.18.2
   1.20.0
   1.19.1
   Resolution: Fixed

merged to master as ce0b61f376b and ce0b61f376b^
merged to release-1.19 as 551f4ae7dde and 551f4ae7dde^
merged to release-1.18 as 70f775e7ba1 and 70f775e7ba1^

> Restore from unaligned checkpoints with a custom partitioner fails.
> ---
>
> Key: FLINK-35351
> URL: https://issues.apache.org/jira/browse/FLINK-35351
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Reporter: Dmitriy Linevich
>Assignee: Dmitriy Linevich
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.2, 1.20.0, 1.19.1
>
>
> We encountered a problem when using a custom partitioner with unaligned 
> checkpoints. The bug reproduces under the following steps:
>  # Run a job with graph: Source[2]->Sink[3], the custom partitioner applied 
> after the Source task.
>  # Make a checkpoint.
>  # Restore from the checkpoint with a different source parallelism: 
> Source[1]->Sink[3].
>  # An exception is thrown.
> This issue does not occur when restoring with the same parallelism or when 
> changing the Sink parallelism. The exception only occurs when the parallelism 
> of the Source is changed while the Sink parallelism remains the same.
> See the exception below and the test code at the end. 
> {code:java}
> [db13789c52b80aad852c53a0afa26247] Task [Sink: sink (3/3)#0] WARN  Sink: sink 
> (3/3)#0 
> (be1d158c2e77fc9ed9e3e5d9a8431dc2_0a448493b4782967b150582570326227_2_0) 
> switched from RUNNING to FAILED with failure cause:
> java.io.IOException: Can't get next record for channel 
> InputChannelInfo{gateIdx=0, inputChannelIdx=0}
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:106)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:600)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:930)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:879)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:960)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) 
> [classes/:?]
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:753) 
> [classes/:?]
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568) 
> [classes/:?]
>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
> Caused by: java.io.IOException: Corrupt stream, found tag: -1
>     at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:222)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:44)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:128)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:103)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer$VirtualChannel.getNextRecord(DemultiplexingRecordDeserializer.java:79)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:154)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:54)
>  ~[classes/:?]
>     a

[jira] [Commented] (FLINK-28260) flink-runtime-web fails to execute "npm ci" on Apple Silicon (arm64) without rosetta

2024-05-31 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17850963#comment-17850963
 ] 

Piotr Nowojski commented on FLINK-28260:


merged commit d075c9f into apache:master

> flink-runtime-web fails to execute "npm ci" on Apple Silicon (arm64) without 
> rosetta
> 
>
> Key: FLINK-28260
> URL: https://issues.apache.org/jira/browse/FLINK-28260
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, Runtime / Web Frontend
>Reporter: Robert Metzger
>Priority: Major
>
> Flink 1.16-SNAPSHOT fails to build in the flink-runtime-web project because 
> we are using an outdated frontend-maven-plugin (v 1.11.3).
> This is the error:
> {code}
> [ERROR] Failed to execute goal 
> com.github.eirslett:frontend-maven-plugin:1.11.3:npm (npm install) on project 
> flink-runtime-web: Failed to run task: 'npm ci --cache-max=0 --no-save 
> ${npm.proxy}' failed. java.io.IOException: Cannot run program 
> "/Users/rmetzger/Projects/flink/flink-runtime-web/web-dashboard/node/node" 
> (in directory 
> "/Users/rmetzger/Projects/flink/flink-runtime-web/web-dashboard"): error=86, 
> Bad CPU type in executable -> [Help 1]
> {code}
> Using the latest frontend-maven-plugin (v. 1.12.1) properly passes the build, 
> because this version downloads the proper arm64 npm version. However, 
> frontend-maven-plugin 1.12.1 requires Maven 3.6.0, which is too high for 
> Flink (highest mvn version for Flink is 3.2.5).
> The best workaround is using rosetta on M1 Macs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-28260) flink-runtime-web fails to execute "npm ci" on Apple Silicon (arm64) without rosetta

2024-05-31 Thread Piotr Nowojski (Jira)


[ https://issues.apache.org/jira/browse/FLINK-28260 ]


Piotr Nowojski deleted comment on FLINK-28260:


was (Author: pnowojski):
merged commit d075c9f into apache:master

> flink-runtime-web fails to execute "npm ci" on Apple Silicon (arm64) without 
> rosetta
> 
>
> Key: FLINK-28260
> URL: https://issues.apache.org/jira/browse/FLINK-28260
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, Runtime / Web Frontend
>Reporter: Robert Metzger
>Priority: Major
>
> Flink 1.16-SNAPSHOT fails to build in the flink-runtime-web project because 
> we are using an outdated frontend-maven-plugin (v 1.11.3).
> This is the error:
> {code}
> [ERROR] Failed to execute goal 
> com.github.eirslett:frontend-maven-plugin:1.11.3:npm (npm install) on project 
> flink-runtime-web: Failed to run task: 'npm ci --cache-max=0 --no-save 
> ${npm.proxy}' failed. java.io.IOException: Cannot run program 
> "/Users/rmetzger/Projects/flink/flink-runtime-web/web-dashboard/node/node" 
> (in directory 
> "/Users/rmetzger/Projects/flink/flink-runtime-web/web-dashboard"): error=86, 
> Bad CPU type in executable -> [Help 1]
> {code}
> Using the latest frontend-maven-plugin (v. 1.12.1) properly passes the build, 
> because this version downloads the proper arm64 npm version. However, 
> frontend-maven-plugin 1.12.1 requires Maven 3.6.0, which is too high for 
> Flink (highest mvn version for Flink is 3.2.5).
> The best workaround is using rosetta on M1 Macs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32142) Apple Silicon Support: Unable to Build Flink Project due to "Bad CPU Type" Error

2024-05-31 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-32142.
--
Resolution: Fixed

merged commit d075c9f into apache:master

> Apple Silicon Support: Unable to Build Flink Project due to "Bad CPU Type" 
> Error
> 
>
> Key: FLINK-32142
> URL: https://issues.apache.org/jira/browse/FLINK-32142
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, Runtime / Web Frontend
>Affects Versions: 1.15.0, 1.15.1, 1.16.0, 1.17.0, 1.15.2, 1.15.3, 1.16.1, 
> 1.15.4, 1.16.2, 1.18.0, 1.17.1, 1.15.5, 1.16.3, 1.17.2
> Environment: Apple Silicon architecture (M2 Pro)
> macOS Ventura (Version 13.3.1)
>Reporter: Elphas Toringepi
>Assignee: Elphas Toringepi
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.20.0
>
>
> Attempting to build the Flink project on Apple Silicon architecture results 
> in an error related to the execution of the frontend-maven-plugin.
> The error message indicates that the plugin fails to run 
> "flink/flink-runtime-web/web-dashboard/node/node" program due to a "Bad CPU 
> type in executable" error.
> {code:java}
> [ERROR] Failed to execute goal 
> com.github.eirslett:frontend-maven-plugin:1.11.0:npm (npm install) on project 
> flink-runtime-web: Failed to run task: 'npm ci --cache-max=0 --no-save 
> ${npm.proxy}' failed. java.io.IOException: Cannot run program 
> "flink/flink-runtime-web/web-dashboard/node/node" (in directory 
> "flink/flink-runtime-web/web-dashboard"): error=86, Bad CPU type in 
> executable{code}
> Steps to Reproduce:
>  # Clone the Flink project repository. 
>  # Attempt to build the project on an Apple Silicon device.
>  # Observe the error message mentioned above.
> {code:java}
> git clone https://github.com/apache/flink.git
> cd flink
> ./mvnw clean package -DskipTests
> {code}
> Proposed Solution
> Upgrade frontend-maven-plugin from version 1.11.0 to the latest version, 
> 1.12.1.
> frontend-maven-plugin version 1.11.0 downloads x64 binaries 
> node-v16.13.2-darwin-x64.tar.gz instead of the arm64 binaries.
> Support for arm64 has been available for frontend-maven-plugin  since version 
> 2. [https://github.com/eirslett/frontend-maven-plugin/pull/970]
> {code:java}
> [DEBUG] Executing command line 
> [/Users/elphas/src/flink/flink-runtime-web/web-dashboard/node/node, 
> --version] [INFO] Installing node version v16.13.2 [DEBUG] Creating temporary 
> directory /flink/flink-runtime-web/web-dashboard/node/tmp [INFO] Unpacking 
> ~/.m2/repository/com/github/eirslett/node/16.13.2/node-16.13.2-darwin-x64.tar.gz
>  into flink/flink-runtime-web/web-dashboard/node/tmp{code}
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30719) flink-runtime-web failed due to a corrupted nodejs dependency

2024-05-31 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17850965#comment-17850965
 ] 

Piotr Nowojski commented on FLINK-30719:


{noformat}
14:47:52.892 [ERROR] Failed to execute goal 
com.github.eirslett:frontend-maven-plugin:1.11.0:install-node-and-npm (install 
node and npm) on project flink-runtime-web: Could not extract the Node archive: 
Could not extract archive: 
'/__w/3/.m2/repository/com/github/eirslett/node/16.13.2/node-16.13.2-linux-x64.tar.gz':
 EOFException -> [Help 1]
1
{noformat}
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59951&view=logs&j=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5&t=54421a62-0c80-5aad-3319-094ff69180bb


> flink-runtime-web failed due to a corrupted nodejs dependency
> -
>
> Key: FLINK-30719
> URL: https://issues.apache.org/jira/browse/FLINK-30719
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Web Frontend, Test Infrastructure, Tests
>Affects Versions: 1.16.0, 1.17.0, 1.18.0
>Reporter: Matthias Pohl
>Assignee: Sergey Nuyanzin
>Priority: Critical
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44954&view=logs&j=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5&t=54421a62-0c80-5aad-3319-094ff69180bb&l=12550
> The build failed due to a corrupted nodejs dependency:
> {code}
> [ERROR] The archive file 
> /__w/1/.m2/repository/com/github/eirslett/node/16.13.2/node-16.13.2-linux-x64.tar.gz
>  is corrupted and will be deleted. Please try the build again.
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-30719) flink-runtime-web failed due to a corrupted nodejs dependency

2024-05-31 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17850965#comment-17850965
 ] 

Piotr Nowojski edited comment on FLINK-30719 at 5/31/24 8:11 AM:
-

{code:java}
14:47:52.892 [ERROR] Failed to execute goal 
com.github.eirslett:frontend-maven-plugin:1.11.0:install-node-and-npm (install 
node and npm) on project flink-runtime-web: Could not extract the Node archive: 
Could not extract archive: 
'/__w/3/.m2/repository/com/github/eirslett/node/16.13.2/node-16.13.2-linux-x64.tar.gz':
 EOFException -> [Help 1]
1
{code}
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59951&view=logs&j=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5&t=54421a62-0c80-5aad-3319-094ff69180bb



was (Author: pnowojski):
{noformat}
14:47:52.892 [ERROR] Failed to execute goal 
com.github.eirslett:frontend-maven-plugin:1.11.0:install-node-and-npm (install 
node and npm) on project flink-runtime-web: Could not extract the Node archive: 
Could not extract archive: 
'/__w/3/.m2/repository/com/github/eirslett/node/16.13.2/node-16.13.2-linux-x64.tar.gz':
 EOFException -> [Help 1]
1
{noformat}
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59951&view=logs&j=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5&t=54421a62-0c80-5aad-3319-094ff69180bb


> flink-runtime-web failed due to a corrupted nodejs dependency
> -
>
> Key: FLINK-30719
> URL: https://issues.apache.org/jira/browse/FLINK-30719
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Web Frontend, Test Infrastructure, Tests
>Affects Versions: 1.16.0, 1.17.0, 1.18.0
>Reporter: Matthias Pohl
>Assignee: Sergey Nuyanzin
>Priority: Critical
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44954&view=logs&j=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5&t=54421a62-0c80-5aad-3319-094ff69180bb&l=12550
> The build failed due to a corrupted nodejs dependency:
> {code}
> [ERROR] The archive file 
> /__w/1/.m2/repository/com/github/eirslett/node/16.13.2/node-16.13.2-linux-x64.tar.gz
>  is corrupted and will be deleted. Please try the build again.
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35518) CI Bot doesn't run on PRs

2024-06-04 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-35518:
--

 Summary: CI Bot doesn't run on PRs
 Key: FLINK-35518
 URL: https://issues.apache.org/jira/browse/FLINK-35518
 Project: Flink
  Issue Type: Bug
  Components: Build System / CI
Affects Versions: 1.20.0
Reporter: Piotr Nowojski


Doesn't want to run on my PR/branch. I was doing force-pushes, rebases, asking 
flink bot to run, closed and opened new PR, but nothing helped
https://github.com/apache/flink/pull/24868
https://github.com/apache/flink/pull/24883

I've heard others were having similar problems recently.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35518) CI Bot doesn't run on PRs - status UNKNOWN

2024-06-04 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-35518:
---
Summary: CI Bot doesn't run on PRs - status UNKNOWN  (was: CI Bot doesn't 
run on PRs)

> CI Bot doesn't run on PRs - status UNKNOWN
> --
>
> Key: FLINK-35518
> URL: https://issues.apache.org/jira/browse/FLINK-35518
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.20.0
>Reporter: Piotr Nowojski
>Priority: Critical
>
> Doesn't want to run on my PR/branch. I was doing force-pushes, rebases, 
> asking flink bot to run, closed and opened new PR, but nothing helped
> https://github.com/apache/flink/pull/24868
> https://github.com/apache/flink/pull/24883
> I've heard others were having similar problems recently.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35528) Skip execution of interruptible mails when yielding

2024-06-05 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-35528:
--

 Summary: Skip execution of interruptible mails when yielding
 Key: FLINK-35528
 URL: https://issues.apache.org/jira/browse/FLINK-35528
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Checkpointing, Runtime / Task
Affects Versions: 1.20.0
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski


When operators are yielding, for example waiting for async state access to 
complete before a checkpoint, it would be beneficial to not execute 
interruptible mails. Otherwise continuation mail for firing timers would be 
continuously re-enqeueed. To achieve that MailboxExecutor must be aware which 
mails are interruptible.

The easiest way to achieve this is to set MIN_PRIORITY for interruptible mails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-20217) More fine-grained timer processing

2024-06-12 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854413#comment-17854413
 ] 

Piotr Nowojski commented on FLINK-20217:


Merged to master as 503593ab2d4..986d06d2cd7

> More fine-grained timer processing
> --
>
> Key: FLINK-20217
> URL: https://issues.apache.org/jira/browse/FLINK-20217
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream, Runtime / Task
>Affects Versions: 1.10.2, 1.11.2, 1.12.0
>Reporter: Nico Kruber
>Assignee: Piotr Nowojski
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
>
> Timers are currently processed in one big block under the checkpoint lock 
> (under {{InternalTimerServiceImpl#advanceWatermark}}. This can be problematic 
> in a number of scenarios while doing checkpointing which would lead to 
> checkpoints timing out (and even unaligned checkpoints would not help).
> If you have a huge number of timers to process when advancing the watermark 
> and the task is also back-pressured, the situation may actually be worse 
> since you would block on the checkpoint lock and also wait for 
> buffers/credits from the receiver.
> I propose to make this loop more fine-grained so that it is interruptible by 
> checkpoints, but maybe there is also some other way to improve here.
> This issue has been for example observed here: 
> https://lists.apache.org/thread/f6ffk9912fg5j1rfkxbzrh0qmp4w6qry



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35528) Skip execution of interruptible mails when yielding

2024-06-12 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-35528.
--
Fix Version/s: 1.20.0
   Resolution: Fixed

merged commit 77cd1cd into apache:master now

> Skip execution of interruptible mails when yielding
> ---
>
> Key: FLINK-35528
> URL: https://issues.apache.org/jira/browse/FLINK-35528
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / Task
>Affects Versions: 1.20.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> When operators are yielding, for example waiting for async state access to 
> complete before a checkpoint, it would be beneficial to not execute 
> interruptible mails. Otherwise continuation mail for firing timers would be 
> continuously re-enqeueed. To achieve that MailboxExecutor must be aware which 
> mails are interruptible.
> The easiest way to achieve this is to set MIN_PRIORITY for interruptible 
> mails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-20217) More fine-grained timer processing

2024-06-12 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-20217.
--
Fix Version/s: 1.20.0
   Resolution: Fixed

Firing timers can now be interrupted to speed up checkpointing. Timers that 
were interrupted by a checkpoint, will be fired shortly after checkpoint 
completes. By default this features is disabled. To enabled it please set:

execution.checkpointing.unaligned.interruptible-timers.enabled

to true.

> More fine-grained timer processing
> --
>
> Key: FLINK-20217
> URL: https://issues.apache.org/jira/browse/FLINK-20217
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream, Runtime / Task
>Affects Versions: 1.10.2, 1.11.2, 1.12.0
>Reporter: Nico Kruber
>Assignee: Piotr Nowojski
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
> Fix For: 1.20.0
>
>
> Timers are currently processed in one big block under the checkpoint lock 
> (under {{InternalTimerServiceImpl#advanceWatermark}}. This can be problematic 
> in a number of scenarios while doing checkpointing which would lead to 
> checkpoints timing out (and even unaligned checkpoints would not help).
> If you have a huge number of timers to process when advancing the watermark 
> and the task is also back-pressured, the situation may actually be worse 
> since you would block on the checkpoint lock and also wait for 
> buffers/credits from the receiver.
> I propose to make this loop more fine-grained so that it is interruptible by 
> checkpoints, but maybe there is also some other way to improve here.
> This issue has been for example observed here: 
> https://lists.apache.org/thread/f6ffk9912fg5j1rfkxbzrh0qmp4w6qry



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (FLINK-20217) More fine-grained timer processing

2024-06-12 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reopened FLINK-20217:


> More fine-grained timer processing
> --
>
> Key: FLINK-20217
> URL: https://issues.apache.org/jira/browse/FLINK-20217
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream, Runtime / Task
>Affects Versions: 1.10.2, 1.11.2, 1.12.0
>Reporter: Nico Kruber
>Assignee: Piotr Nowojski
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
> Fix For: 1.20.0
>
>
> Timers are currently processed in one big block under the checkpoint lock 
> (under {{InternalTimerServiceImpl#advanceWatermark}}. This can be problematic 
> in a number of scenarios while doing checkpointing which would lead to 
> checkpoints timing out (and even unaligned checkpoints would not help).
> If you have a huge number of timers to process when advancing the watermark 
> and the task is also back-pressured, the situation may actually be worse 
> since you would block on the checkpoint lock and also wait for 
> buffers/credits from the receiver.
> I propose to make this loop more fine-grained so that it is interruptible by 
> checkpoints, but maybe there is also some other way to improve here.
> This issue has been for example observed here: 
> https://lists.apache.org/thread/f6ffk9912fg5j1rfkxbzrh0qmp4w6qry



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-20217) More fine-grained timer processing

2024-06-12 Thread Piotr Nowojski (Jira)


[ https://issues.apache.org/jira/browse/FLINK-20217 ]


Piotr Nowojski deleted comment on FLINK-20217:


was (Author: pnowojski):
Firing timers can now be interrupted to speed up checkpointing. Timers that 
were interrupted by a checkpoint, will be fired shortly after checkpoint 
completes. By default this features is disabled. To enabled it please set:

execution.checkpointing.unaligned.interruptible-timers.enabled

to true.

> More fine-grained timer processing
> --
>
> Key: FLINK-20217
> URL: https://issues.apache.org/jira/browse/FLINK-20217
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream, Runtime / Task
>Affects Versions: 1.10.2, 1.11.2, 1.12.0
>Reporter: Nico Kruber
>Assignee: Piotr Nowojski
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
> Fix For: 1.20.0
>
>
> Timers are currently processed in one big block under the checkpoint lock 
> (under {{InternalTimerServiceImpl#advanceWatermark}}. This can be problematic 
> in a number of scenarios while doing checkpointing which would lead to 
> checkpoints timing out (and even unaligned checkpoints would not help).
> If you have a huge number of timers to process when advancing the watermark 
> and the task is also back-pressured, the situation may actually be worse 
> since you would block on the checkpoint lock and also wait for 
> buffers/credits from the receiver.
> I propose to make this loop more fine-grained so that it is interruptible by 
> checkpoints, but maybe there is also some other way to improve here.
> This issue has been for example observed here: 
> https://lists.apache.org/thread/f6ffk9912fg5j1rfkxbzrh0qmp4w6qry



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-20217) More fine-grained timer processing

2024-06-12 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-20217.
--
Release Note: 
Firing timers can now be interrupted to speed up checkpointing. Timers that 
were interrupted by a checkpoint, will be fired shortly after checkpoint 
completes. By default this features is disabled. To enabled it please set:

execution.checkpointing.unaligned.interruptible-timers.enabled

to true. Currently supported only by all TableStreamOperators and CepOperator.
  Resolution: Fixed

> More fine-grained timer processing
> --
>
> Key: FLINK-20217
> URL: https://issues.apache.org/jira/browse/FLINK-20217
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream, Runtime / Task
>Affects Versions: 1.10.2, 1.11.2, 1.12.0
>Reporter: Nico Kruber
>Assignee: Piotr Nowojski
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
> Fix For: 1.20.0
>
>
> Timers are currently processed in one big block under the checkpoint lock 
> (under {{InternalTimerServiceImpl#advanceWatermark}}. This can be problematic 
> in a number of scenarios while doing checkpointing which would lead to 
> checkpoints timing out (and even unaligned checkpoints would not help).
> If you have a huge number of timers to process when advancing the watermark 
> and the task is also back-pressured, the situation may actually be worse 
> since you would block on the checkpoint lock and also wait for 
> buffers/credits from the receiver.
> I propose to make this loop more fine-grained so that it is interruptible by 
> checkpoints, but maybe there is also some other way to improve here.
> This issue has been for example observed here: 
> https://lists.apache.org/thread/f6ffk9912fg5j1rfkxbzrh0qmp4w6qry



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34252) WatermarkAssignerOperator should not emit WatermarkStatus.IDLE under continuous data flow

2024-06-14 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-34252:
--

Assignee: Piotr Nowojski  (was: David Christle)

> WatermarkAssignerOperator should not emit WatermarkStatus.IDLE under 
> continuous data flow
> -
>
> Key: FLINK-34252
> URL: https://issues.apache.org/jira/browse/FLINK-34252
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.16.3, 1.17.2, 1.18.1
>Reporter: David Christle
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.2, 1.20.0
>
>
> The WatermarkAssignerOperator in the table runtime incorrectly transitions to 
> an IDLE state even when data is continuously flowing. This behavior, observed 
> under normal operating conditions where the interval between data elements is 
> shorter than the configured idleTimeout, leads to regular transitions between 
> ACTIVE and IDLE states, which are unnecessary.
> _Detail:_
> In the current implementation, the lastRecordTime variable, which tracks the 
> time of the last received data element, is updated only when the 
> WatermarkStatus transitions from IDLE to ACTIVE. However, it is not updated 
> when WatermarkStatus is ACTIVE, which means even under continuous data flow, 
> the condition `(currentTime - lastRecordTime > idleTimeout)` will eventually 
> always become true, and the WatermarkStatus will erroneously be marked IDLE. 
> It is unclear to me if this bug produces any incorrectness downstream, since 
> when the WatermarkStatus is in in the IDLE state, the next processElement 
> will cause a WatermarkStatus.ACTIVE to be emitted. Nevertheless, we should 
> eliminate this flip-flop behavior between states.
> The test I wrote fails without the fix and illustrates the flip-flops:
> {noformat}
> [ERROR] Tests run: 4, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.030 
> s <<< FAILURE! -- in 
> org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperatorTest
> [ERROR] 
> org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperatorTest.testIdleStateAvoidanceWithConsistentDataFlow
>  -- Time elapsed: 0.013 s <<< FAILURE!
> java.lang.AssertionError:
> Expecting
>   [WatermarkStatus(IDLE),
> WatermarkStatus(ACTIVE),
> WatermarkStatus(IDLE),
> WatermarkStatus(ACTIVE),
> WatermarkStatus(IDLE),
> WatermarkStatus(ACTIVE),
> WatermarkStatus(IDLE),
> WatermarkStatus(ACTIVE),
> WatermarkStatus(IDLE)]
> not to contain
>   [WatermarkStatus(IDLE)]
> but found
>   [WatermarkStatus(IDLE)]
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34252) WatermarkAssignerOperator should not emit WatermarkStatus.IDLE under continuous data flow

2024-06-18 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17855830#comment-17855830
 ] 

Piotr Nowojski commented on FLINK-34252:


Merged to master as d93e92c665a and d93e92c665a^

> WatermarkAssignerOperator should not emit WatermarkStatus.IDLE under 
> continuous data flow
> -
>
> Key: FLINK-34252
> URL: https://issues.apache.org/jira/browse/FLINK-34252
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.16.3, 1.17.2, 1.18.1
>Reporter: David Christle
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.2, 1.20.0
>
>
> The WatermarkAssignerOperator in the table runtime incorrectly transitions to 
> an IDLE state even when data is continuously flowing. This behavior, observed 
> under normal operating conditions where the interval between data elements is 
> shorter than the configured idleTimeout, leads to regular transitions between 
> ACTIVE and IDLE states, which are unnecessary.
> _Detail:_
> In the current implementation, the lastRecordTime variable, which tracks the 
> time of the last received data element, is updated only when the 
> WatermarkStatus transitions from IDLE to ACTIVE. However, it is not updated 
> when WatermarkStatus is ACTIVE, which means even under continuous data flow, 
> the condition `(currentTime - lastRecordTime > idleTimeout)` will eventually 
> always become true, and the WatermarkStatus will erroneously be marked IDLE. 
> It is unclear to me if this bug produces any incorrectness downstream, since 
> when the WatermarkStatus is in in the IDLE state, the next processElement 
> will cause a WatermarkStatus.ACTIVE to be emitted. Nevertheless, we should 
> eliminate this flip-flop behavior between states.
> The test I wrote fails without the fix and illustrates the flip-flops:
> {noformat}
> [ERROR] Tests run: 4, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.030 
> s <<< FAILURE! -- in 
> org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperatorTest
> [ERROR] 
> org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperatorTest.testIdleStateAvoidanceWithConsistentDataFlow
>  -- Time elapsed: 0.013 s <<< FAILURE!
> java.lang.AssertionError:
> Expecting
>   [WatermarkStatus(IDLE),
> WatermarkStatus(ACTIVE),
> WatermarkStatus(IDLE),
> WatermarkStatus(ACTIVE),
> WatermarkStatus(IDLE),
> WatermarkStatus(ACTIVE),
> WatermarkStatus(IDLE),
> WatermarkStatus(ACTIVE),
> WatermarkStatus(IDLE)]
> not to contain
>   [WatermarkStatus(IDLE)]
> but found
>   [WatermarkStatus(IDLE)]
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34252) WatermarkAssignerOperator should not emit WatermarkStatus.IDLE under continuous data flow

2024-06-18 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-34252.
--
Fix Version/s: 1.19.2
   Resolution: Fixed

merged to release-1.18 as b60318ba1a7^ and b60318ba1a7
merged to release-1.19 as c5dfb57fd94^ and c5dfb57fd94


> WatermarkAssignerOperator should not emit WatermarkStatus.IDLE under 
> continuous data flow
> -
>
> Key: FLINK-34252
> URL: https://issues.apache.org/jira/browse/FLINK-34252
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.16.3, 1.17.2, 1.18.1
>Reporter: David Christle
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.2, 1.20.0, 1.19.2
>
>
> The WatermarkAssignerOperator in the table runtime incorrectly transitions to 
> an IDLE state even when data is continuously flowing. This behavior, observed 
> under normal operating conditions where the interval between data elements is 
> shorter than the configured idleTimeout, leads to regular transitions between 
> ACTIVE and IDLE states, which are unnecessary.
> _Detail:_
> In the current implementation, the lastRecordTime variable, which tracks the 
> time of the last received data element, is updated only when the 
> WatermarkStatus transitions from IDLE to ACTIVE. However, it is not updated 
> when WatermarkStatus is ACTIVE, which means even under continuous data flow, 
> the condition `(currentTime - lastRecordTime > idleTimeout)` will eventually 
> always become true, and the WatermarkStatus will erroneously be marked IDLE. 
> It is unclear to me if this bug produces any incorrectness downstream, since 
> when the WatermarkStatus is in in the IDLE state, the next processElement 
> will cause a WatermarkStatus.ACTIVE to be emitted. Nevertheless, we should 
> eliminate this flip-flop behavior between states.
> The test I wrote fails without the fix and illustrates the flip-flops:
> {noformat}
> [ERROR] Tests run: 4, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.030 
> s <<< FAILURE! -- in 
> org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperatorTest
> [ERROR] 
> org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperatorTest.testIdleStateAvoidanceWithConsistentDataFlow
>  -- Time elapsed: 0.013 s <<< FAILURE!
> java.lang.AssertionError:
> Expecting
>   [WatermarkStatus(IDLE),
> WatermarkStatus(ACTIVE),
> WatermarkStatus(IDLE),
> WatermarkStatus(ACTIVE),
> WatermarkStatus(IDLE),
> WatermarkStatus(ACTIVE),
> WatermarkStatus(IDLE),
> WatermarkStatus(ACTIVE),
> WatermarkStatus(IDLE)]
> not to contain
>   [WatermarkStatus(IDLE)]
> but found
>   [WatermarkStatus(IDLE)]
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35629) Performance regression in stringRead and stringWrite

2024-06-19 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856172#comment-17856172
 ] 

Piotr Nowojski commented on FLINK-35629:


It's strange, why JMH upgrade is causing this? Isn't this yet another case that 
JIT is going crazy due to some unrelated change? 

Anyway +1 for just accepting it.

> Performance regression in stringRead and stringWrite
> 
>
> Key: FLINK-35629
> URL: https://issues.apache.org/jira/browse/FLINK-35629
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks
>Affects Versions: 1.20.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
> Fix For: 1.20.0
>
> Attachments: image-2024-06-18-14-52-55-164.png
>
>
> [http://flink-speed.xyz/timeline/#/?exe=1&ben=stringWrite.128.ascii&extr=on&quarts=on&equid=off&env=3&revs=200]
> [http://flink-speed.xyz/timeline/#/?exe=1&ben=stringWrite.128.chinese&extr=on&quarts=on&equid=off&env=3&revs=200]
> [http://flink-speed.xyz/timeline/#/?exe=1&ben=stringWrite.128.russian&extr=on&quarts=on&equid=off&env=3&revs=200]
> [http://flink-speed.xyz/timeline/#/?exe=6&ben=stringRead.128.chinese&extr=on&quarts=on&equid=off&env=3&revs=200]
> [http://flink-speed.xyz/timeline/#/?exe=6&ben=stringRead.4.ascii&extr=on&quarts=on&equid=off&env=3&revs=200]
> [http://flink-speed.xyz/timeline/#/?exe=6&ben=stringRead.4.chinese&extr=on&quarts=on&equid=off&env=3&revs=200]
> [http://flink-speed.xyz/timeline/#/?exe=6&ben=stringRead.4.russian&extr=on&quarts=on&equid=off&env=3&revs=200]
> !image-2024-06-18-14-52-55-164.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25646) Document buffer debloating issues with high parallelism

2024-06-19 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856273#comment-17856273
 ] 

Piotr Nowojski commented on FLINK-25646:


Hi [~hejufang001]. No, there have been no progress on this issue.

> floating buffers per gate 8(default value)2000

does it mean for buffer debloating enabled, you have increased floating buffers 
from 8 to 2000?

Also, have you tried setting higher debloating target? AFAIR we have observed 
that increasing the target from 1s (default) to 5s seems to behave much better 
where it comes to the max throughput.

> Document buffer debloating issues with high parallelism
> ---
>
> Key: FLINK-25646
> URL: https://issues.apache.org/jira/browse/FLINK-25646
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Runtime / Network
>Affects Versions: 1.14.0
>Reporter: Anton Kalashnikov
>Assignee: Anton Kalashnikov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> According to last benchmarks, there are some problems with buffer debloat 
> when job has high parallelism. The high parallelism means the different value 
> from job to job but in general it is more than 200. So it makes sense to 
> document that problem and propose the solution - increasing the number of 
> buffers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25646) Document buffer debloating issues with high parallelism

2024-06-24 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17859595#comment-17859595
 ] 

Piotr Nowojski commented on FLINK-25646:


Yes, that would be the best. Neither me nor [~akalashnikov] are currently 
looking into it. From our past investigation it looked like subtasks in the 
cluster were ending up oscillating in terms of CPU usage/business. They were 
100% busy for some time, then idle for a short period, then 100% busy again. It 
was strange, clearly bogus, but we haven't managed to nail it down why is it 
happening.

> Document buffer debloating issues with high parallelism
> ---
>
> Key: FLINK-25646
> URL: https://issues.apache.org/jira/browse/FLINK-25646
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Runtime / Network
>Affects Versions: 1.14.0
>Reporter: Anton Kalashnikov
>Assignee: Anton Kalashnikov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> According to last benchmarks, there are some problems with buffer debloat 
> when job has high parallelism. The high parallelism means the different value 
> from job to job but in general it is more than 200. So it makes sense to 
> document that problem and propose the solution - increasing the number of 
> buffers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-25646) Document buffer debloating issues with high parallelism

2024-06-24 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17859595#comment-17859595
 ] 

Piotr Nowojski edited comment on FLINK-25646 at 6/24/24 7:39 AM:
-

Yes, that would be the best. Neither me nor [~akalashnikov] are currently 
looking into it. From our past investigation it looked like subtasks in the 
cluster were ending up oscillating in terms of CPU usage/business. They were 
100% busy for some time, then idle for a short period, then 100% busy again. 
Busy/idle as in terms of Flink's busy/idle metrics. It was strange, clearly 
bogus, but we haven't managed to nail it down why is it happening.


was (Author: pnowojski):
Yes, that would be the best. Neither me nor [~akalashnikov] are currently 
looking into it. From our past investigation it looked like subtasks in the 
cluster were ending up oscillating in terms of CPU usage/business. They were 
100% busy for some time, then idle for a short period, then 100% busy again. It 
was strange, clearly bogus, but we haven't managed to nail it down why is it 
happening.

> Document buffer debloating issues with high parallelism
> ---
>
> Key: FLINK-25646
> URL: https://issues.apache.org/jira/browse/FLINK-25646
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Runtime / Network
>Affects Versions: 1.14.0
>Reporter: Anton Kalashnikov
>Assignee: Anton Kalashnikov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> According to last benchmarks, there are some problems with buffer debloat 
> when job has high parallelism. The high parallelism means the different value 
> from job to job but in general it is more than 200. So it makes sense to 
> document that problem and propose the solution - increasing the number of 
> buffers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35614) Release Testing Instructions: Verify FLIP-443: Interruptible timers firing

2024-07-02 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-35614.
--
Resolution: Won't Fix

I think there is no need to test this feature:
* it is already covered by existing set of quite extensive e2e tests (in form 
of `ITCase` tests)
* it wouldn't be easy to setup a test
* it wouldn't be easy to actually validate from the user's perspective if 
things are working as expected

> Release Testing Instructions: Verify  FLIP-443: Interruptible timers firing 
> 
>
> Key: FLINK-35614
> URL: https://issues.apache.org/jira/browse/FLINK-35614
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: Rui Fan
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.20.0
>
>
> Follow up the test for https://issues.apache.org/jira/browse/FLINK-20217



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35739) FLIP-444: Native file copy support

2024-07-02 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-35739:
--

 Summary: FLIP-444: Native file copy support
 Key: FLINK-35739
 URL: https://issues.apache.org/jira/browse/FLINK-35739
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / FileSystem
Reporter: Piotr Nowojski


https://cwiki.apache.org/confluence/display/FLINK/FLIP-444%3A+Native+file+copy+support

State downloading in Flink can be a time and CPU consuming operation, which is 
especially visible if CPU resources per task slot are strictly restricted to 
for example a single CPU. Downloading 1GB of state size can take significant 
amount of time, while the code doing so is quite inefficient.

Currently when downloading state files, Flink is creating an FSDataInputStream 
from the remote file, and copies its bytes, to an OutputStream pointing to a 
local file (in the RocksDBStateDownloader#downloadDataForStateHandle method). 
FSDataInputStream internally is being wrapped by many layers of abstractions 
and indirections and what’s worse, every file is being copied individually, 
which leads to quite high overheads for small files. Download times and 
download process CPU efficiency can be significantly improved if we introduced 
an API to allow org.apache.flink.core.fs.FileSystem to copy many files natively 
and all at once.

For S3, there are at least two potential implementations. The first one is 
using AWS SDKv2 directly (Flink currently is using AWS SDKv1 wrapped by 
hadoop/presto) and Amazon S3 Transfer Manager. Second option is to use a 3rd 
party tool called s5cmd. It is claimed to be a faster alternative to the 
official AWS clients, which was confirmed by our benchmarks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35739) FLIP-444: Native file copy support

2024-07-05 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-35739:
---
Component/s: Runtime / State Backends

> FLIP-444: Native file copy support
> --
>
> Key: FLINK-35739
> URL: https://issues.apache.org/jira/browse/FLINK-35739
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem, Runtime / State Backends
>Reporter: Piotr Nowojski
>Priority: Major
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-444%3A+Native+file+copy+support
> State downloading in Flink can be a time and CPU consuming operation, which 
> is especially visible if CPU resources per task slot are strictly restricted 
> to for example a single CPU. Downloading 1GB of state size can take 
> significant amount of time, while the code doing so is quite inefficient.
> Currently when downloading state files, Flink is creating an 
> FSDataInputStream from the remote file, and copies its bytes, to an 
> OutputStream pointing to a local file (in the 
> RocksDBStateDownloader#downloadDataForStateHandle method). FSDataInputStream 
> internally is being wrapped by many layers of abstractions and indirections 
> and what’s worse, every file is being copied individually, which leads to 
> quite high overheads for small files. Download times and download process CPU 
> efficiency can be significantly improved if we introduced an API to allow 
> org.apache.flink.core.fs.FileSystem to copy many files natively and all at 
> once.
> For S3, there are at least two potential implementations. The first one is 
> using AWS SDKv2 directly (Flink currently is using AWS SDKv1 wrapped by 
> hadoop/presto) and Amazon S3 Transfer Manager. Second option is to use a 3rd 
> party tool called s5cmd. It is claimed to be a faster alternative to the 
> official AWS clients, which was confirmed by our benchmarks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35768) Use native file copy in RocksDBStateDownloader

2024-07-05 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-35768:
--

 Summary: Use native file copy in RocksDBStateDownloader
 Key: FLINK-35768
 URL: https://issues.apache.org/jira/browse/FLINK-35768
 Project: Flink
  Issue Type: Sub-task
Reporter: Piotr Nowojski






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35767) Provide native file copy support for S3 using s5cmd

2024-07-05 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-35767:
--

 Summary: Provide native file copy support for S3 using s5cmd
 Key: FLINK-35767
 URL: https://issues.apache.org/jira/browse/FLINK-35767
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / FileSystem
Reporter: Piotr Nowojski






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35771) Limit s5cmd resource usage

2024-07-05 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-35771:
--

 Summary: Limit s5cmd resource usage
 Key: FLINK-35771
 URL: https://issues.apache.org/jira/browse/FLINK-35771
 Project: Flink
  Issue Type: Sub-task
Reporter: Piotr Nowojski






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35739) FLIP-444: Native file copy support

2024-07-05 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-35739:
--

Assignee: Piotr Nowojski

> FLIP-444: Native file copy support
> --
>
> Key: FLINK-35739
> URL: https://issues.apache.org/jira/browse/FLINK-35739
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem, Runtime / State Backends
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-444%3A+Native+file+copy+support
> State downloading in Flink can be a time and CPU consuming operation, which 
> is especially visible if CPU resources per task slot are strictly restricted 
> to for example a single CPU. Downloading 1GB of state size can take 
> significant amount of time, while the code doing so is quite inefficient.
> Currently when downloading state files, Flink is creating an 
> FSDataInputStream from the remote file, and copies its bytes, to an 
> OutputStream pointing to a local file (in the 
> RocksDBStateDownloader#downloadDataForStateHandle method). FSDataInputStream 
> internally is being wrapped by many layers of abstractions and indirections 
> and what’s worse, every file is being copied individually, which leads to 
> quite high overheads for small files. Download times and download process CPU 
> efficiency can be significantly improved if we introduced an API to allow 
> org.apache.flink.core.fs.FileSystem to copy many files natively and all at 
> once.
> For S3, there are at least two potential implementations. The first one is 
> using AWS SDKv2 directly (Flink currently is using AWS SDKv1 wrapped by 
> hadoop/presto) and Amazon S3 Transfer Manager. Second option is to use a 3rd 
> party tool called s5cmd. It is claimed to be a faster alternative to the 
> official AWS clients, which was confirmed by our benchmarks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35767) Provide native file copy support for S3 using s5cmd

2024-07-05 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-35767:
--

Assignee: Piotr Nowojski

> Provide native file copy support for S3 using s5cmd
> ---
>
> Key: FLINK-35767
> URL: https://issues.apache.org/jira/browse/FLINK-35767
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / FileSystem
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35770) Interrupt s5cmd call on cancellation

2024-07-05 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-35770:
--

 Summary: Interrupt s5cmd call on cancellation 
 Key: FLINK-35770
 URL: https://issues.apache.org/jira/browse/FLINK-35770
 Project: Flink
  Issue Type: Sub-task
Reporter: Piotr Nowojski






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35772) Deprecate/remove DuplicatingFileSystem

2024-07-05 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-35772:
--

 Summary: Deprecate/remove DuplicatingFileSystem
 Key: FLINK-35772
 URL: https://issues.apache.org/jira/browse/FLINK-35772
 Project: Flink
  Issue Type: Sub-task
Reporter: Piotr Nowojski






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35768) Use native file copy in RocksDBStateDownloader

2024-07-05 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-35768:
--

Assignee: Piotr Nowojski

> Use native file copy in RocksDBStateDownloader
> --
>
> Key: FLINK-35768
> URL: https://issues.apache.org/jira/browse/FLINK-35768
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35773) Document s5cmd

2024-07-05 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-35773:
--

 Summary: Document s5cmd
 Key: FLINK-35773
 URL: https://issues.apache.org/jira/browse/FLINK-35773
 Project: Flink
  Issue Type: Sub-task
Reporter: Piotr Nowojski






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35821) ResumeCheckpointManuallyITCase failed with File X does not exist or the user running Flink C has insufficient permissions to access it

2024-07-11 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-35821:
--

 Summary: ResumeCheckpointManuallyITCase failed with File X does 
not exist or the user running Flink C has insufficient permissions to access it
 Key: FLINK-35821
 URL: https://issues.apache.org/jira/browse/FLINK-35821
 Project: Flink
  Issue Type: Bug
  Components: Test Infrastructure
Affects Versions: 2.0.0, 1.20.0
Reporter: Piotr Nowojski


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60857&view=logs&j=5c8e7682-d68f-54d1-16a2-a09310218a49&t=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba


{noformat}
Jul 11 13:49:46 13:49:46.412 [ERROR] Tests run: 48, Failures: 0, Errors: 1, 
Skipped: 0, Time elapsed: 309.7 s <<< FAILURE! -- in 
org.apache.flink.test.checkpointing.ResumeCheckpointManuallyITCase
Jul 11 13:49:46 13:49:46.412 [ERROR] 
org.apache.flink.test.checkpointing.ResumeCheckpointManuallyITCase.testExternalizedIncrementalRocksDBCheckpointsWithLocalRecoveryZookeeper[RestoreMode
 = CLAIM] -- Time elapsed: 2.722 s <<< ERROR!
Jul 11 13:49:46 org.apache.flink.runtime.JobException: Recovery is suppressed 
by NoRestartBackoffTimeStrategy
Jul 11 13:49:46 at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:219)
Jul 11 13:49:46 at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailureAndReport(ExecutionFailureHandler.java:166)
Jul 11 13:49:46 at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:121)
Jul 11 13:49:46 at 
org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:281)
Jul 11 13:49:46 at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:272)
Jul 11 13:49:46 at 
org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:265)
Jul 11 13:49:46 at 
org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:800)
Jul 11 13:49:46 at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:777)
Jul 11 13:49:46 at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
Jul 11 13:49:46 at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:515)
Jul 11 13:49:46 at sun.reflect.GeneratedMethodAccessor29.invoke(Unknown 
Source)
Jul 11 13:49:46 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
Jul 11 13:49:46 at java.lang.reflect.Method.invoke(Method.java:498)
Jul 11 13:49:46 at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318)
Jul 11 13:49:46 at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
Jul 11 13:49:46 at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316)
Jul 11 13:49:46 at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229)
Jul 11 13:49:46 at 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88)
Jul 11 13:49:46 at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174)
Jul 11 13:49:46 at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
Jul 11 13:49:46 at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
Jul 11 13:49:46 at 
scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
Jul 11 13:49:46 at 
scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
Jul 11 13:49:46 at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
Jul 11 13:49:46 at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
Jul 11 13:49:46 at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
Jul 11 13:49:46 at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
Jul 11 13:49:46 at 
org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
Jul 11 13:49:46 at 
org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
Jul 11 13:49:46 at 
org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
Jul 11 13:49:46 at 
org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
Jul 11 13:49:46 at 
org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
Jul 11 13:49:46 at 
org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)

{noformat}




--
This message was sent by Atlassian Jira
(v8.20.1

[jira] [Updated] (FLINK-35821) ResumeCheckpointManuallyITCase failed with File X does not exist or the user running Flink C has insufficient permissions to access it

2024-07-11 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-35821:
---
Description: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60857&view=logs&j=5c8e7682-d68f-54d1-16a2-a09310218a49&t=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba

primary failure:

{noformat}
Caused by: java.io.FileNotFoundException: File 
file:/tmp/junit5368809541399217009/junit9107863722486384012/5a045e6c0cd0297faf5a2bf6fff27465/shared/job_5a045e6c0cd0297faf5a2bf6fff27465_op_90bea66de1c231edf33913ecd54406c1_1_2/0effb888-aa59-4bc4-b3e6-02622c831863
 does not exist or the user running Flink ('agent01_azpcontainer') has 
insufficient permissions to access it.
{noformat}

Full stack trace

{noformat}
2024-07-11T13:49:46.4137693Z Jul 11 13:49:46 13:49:46.412 [ERROR] Tests run: 
48, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 309.7 s <<< FAILURE! -- 
in org.apache.flink.test.checkpointing.ResumeCheckpointManuallyITCase
2024-07-11T13:49:46.4139710Z Jul 11 13:49:46 13:49:46.412 [ERROR] 
org.apache.flink.test.checkpointing.ResumeCheckpointManuallyITCase.testExternalizedIncrementalRocksDBCheckpointsWithLocalRecoveryZookeeper[RestoreMode
 = CLAIM] -- Time elapsed: 2.722 s <<< ERROR!
2024-07-11T13:49:46.4140928Z Jul 11 13:49:46 
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
2024-07-11T13:49:46.4142766Z Jul 11 13:49:46at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:219)
2024-07-11T13:49:46.4144185Z Jul 11 13:49:46at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailureAndReport(ExecutionFailureHandler.java:166)
2024-07-11T13:49:46.4145249Z Jul 11 13:49:46at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:121)
2024-07-11T13:49:46.4146510Z Jul 11 13:49:46at 
org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:281)
2024-07-11T13:49:46.4147599Z Jul 11 13:49:46at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:272)
2024-07-11T13:49:46.4148975Z Jul 11 13:49:46at 
org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:265)
2024-07-11T13:49:46.4150467Z Jul 11 13:49:46at 
org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:800)
2024-07-11T13:49:46.4151977Z Jul 11 13:49:46at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:777)
2024-07-11T13:49:46.4153308Z Jul 11 13:49:46at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
2024-07-11T13:49:46.4154713Z Jul 11 13:49:46at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:515)
2024-07-11T13:49:46.4155416Z Jul 11 13:49:46at 
sun.reflect.GeneratedMethodAccessor29.invoke(Unknown Source)
2024-07-11T13:49:46.4156342Z Jul 11 13:49:46at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2024-07-11T13:49:46.4157291Z Jul 11 13:49:46at 
java.lang.reflect.Method.invoke(Method.java:498)
2024-07-11T13:49:46.4158065Z Jul 11 13:49:46at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318)
2024-07-11T13:49:46.4159387Z Jul 11 13:49:46at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
2024-07-11T13:49:46.4160469Z Jul 11 13:49:46at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316)
2024-07-11T13:49:46.4161819Z Jul 11 13:49:46at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229)
2024-07-11T13:49:46.4163253Z Jul 11 13:49:46at 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88)
2024-07-11T13:49:46.4164717Z Jul 11 13:49:46at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174)
2024-07-11T13:49:46.4165948Z Jul 11 13:49:46at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
2024-07-11T13:49:46.4167080Z Jul 11 13:49:46at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
2024-07-11T13:49:46.4168228Z Jul 11 13:49:46at 
scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
2024-07-11T13:49:46.4169380Z Jul 11 13:49:46at 
scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
2024-07-11T13:49:46.4170327Z Jul 11 13:49:46at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
2024-07-11T13:49:46.4171192Z Jul 11 13:49:46at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scal

[jira] [Updated] (FLINK-35821) ResumeCheckpointManuallyITCase failed with File X does not exist or the user running Flink C has insufficient permissions to access it

2024-07-11 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-35821:
---
Description: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60857&view=logs&j=5c8e7682-d68f-54d1-16a2-a09310218a49&t=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba


{noformat}
2024-07-11T13:49:46.4137693Z Jul 11 13:49:46 13:49:46.412 [ERROR] Tests run: 
48, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 309.7 s <<< FAILURE! -- 
in org.apache.flink.test.checkpointing.ResumeCheckpointManuallyITCase
2024-07-11T13:49:46.4139710Z Jul 11 13:49:46 13:49:46.412 [ERROR] 
org.apache.flink.test.checkpointing.ResumeCheckpointManuallyITCase.testExternalizedIncrementalRocksDBCheckpointsWithLocalRecoveryZookeeper[RestoreMode
 = CLAIM] -- Time elapsed: 2.722 s <<< ERROR!
2024-07-11T13:49:46.4140928Z Jul 11 13:49:46 
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
2024-07-11T13:49:46.4142766Z Jul 11 13:49:46at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:219)
2024-07-11T13:49:46.4144185Z Jul 11 13:49:46at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailureAndReport(ExecutionFailureHandler.java:166)
2024-07-11T13:49:46.4145249Z Jul 11 13:49:46at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:121)
2024-07-11T13:49:46.4146510Z Jul 11 13:49:46at 
org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:281)
2024-07-11T13:49:46.4147599Z Jul 11 13:49:46at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:272)
2024-07-11T13:49:46.4148975Z Jul 11 13:49:46at 
org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:265)
2024-07-11T13:49:46.4150467Z Jul 11 13:49:46at 
org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:800)
2024-07-11T13:49:46.4151977Z Jul 11 13:49:46at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:777)
2024-07-11T13:49:46.4153308Z Jul 11 13:49:46at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
2024-07-11T13:49:46.4154713Z Jul 11 13:49:46at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:515)
2024-07-11T13:49:46.4155416Z Jul 11 13:49:46at 
sun.reflect.GeneratedMethodAccessor29.invoke(Unknown Source)
2024-07-11T13:49:46.4156342Z Jul 11 13:49:46at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2024-07-11T13:49:46.4157291Z Jul 11 13:49:46at 
java.lang.reflect.Method.invoke(Method.java:498)
2024-07-11T13:49:46.4158065Z Jul 11 13:49:46at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318)
2024-07-11T13:49:46.4159387Z Jul 11 13:49:46at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
2024-07-11T13:49:46.4160469Z Jul 11 13:49:46at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316)
2024-07-11T13:49:46.4161819Z Jul 11 13:49:46at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229)
2024-07-11T13:49:46.4163253Z Jul 11 13:49:46at 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88)
2024-07-11T13:49:46.4164717Z Jul 11 13:49:46at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174)
2024-07-11T13:49:46.4165948Z Jul 11 13:49:46at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
2024-07-11T13:49:46.4167080Z Jul 11 13:49:46at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
2024-07-11T13:49:46.4168228Z Jul 11 13:49:46at 
scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
2024-07-11T13:49:46.4169380Z Jul 11 13:49:46at 
scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
2024-07-11T13:49:46.4170327Z Jul 11 13:49:46at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
2024-07-11T13:49:46.4171192Z Jul 11 13:49:46at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
2024-07-11T13:49:46.4171814Z Jul 11 13:49:46at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
2024-07-11T13:49:46.4172433Z Jul 11 13:49:46at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
2024-07-11T13:49:46.4173029Z Jul 11 13:49:46at 
org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
2024-07-11T13:49:46.4173622Z Jul 11 13:49:46at 
org.apache.pekko.act

[jira] [Commented] (FLINK-35754) SqlGatewayE2ECase.testMaterializedTableInFullMode failed due to Internal Server Error

2024-07-11 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865115#comment-17865115
 ] 

Piotr Nowojski commented on FLINK-35754:


[~hackergin], can you elaborate what's the problem? Is this the same problem 
as: https://issues.apache.org/jira/browse/FLINK-35821 ?

> SqlGatewayE2ECase.testMaterializedTableInFullMode failed due to Internal 
> Server Error
> -
>
> Key: FLINK-35754
> URL: https://issues.apache.org/jira/browse/FLINK-35754
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2024-07-05-11-15-52-731.png
>
>
> {code:java}
> Jul 03 03:14:31 java.lang.RuntimeException: Execute statement failed
> Jul 03 03:14:31   at 
> org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.executeStatementWithResult(FlinkDistribution.java:592)
> Jul 03 03:14:31   at 
> org.apache.flink.table.gateway.SqlGatewayE2ECase.lambda$testMaterializedTableInFullMode$3(SqlGatewayE2ECase.java:353)
> Jul 03 03:14:31   at 
> org.apache.flink.core.testutils.CommonTestUtils.waitUtil(CommonTestUtils.java:210)
> Jul 03 03:14:31   at 
> org.apache.flink.table.gateway.SqlGatewayE2ECase.testMaterializedTableInFullMode(SqlGatewayE2ECase.java:350)
> Jul 03 03:14:31   at 
> java.base/java.lang.reflect.Method.invoke(Method.java:568)
> Jul 03 03:14:31   at 
> org.apache.flink.util.ExternalResource$1.evaluate(ExternalResource.java:48)
> Jul 03 03:14:31   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> Jul 03 03:14:31   at 
> org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:29)
> Jul 03 03:14:31 Caused by: java.lang.RuntimeException: The rest request is 
> not successful: Internal Server Error
> Jul 03 03:14:31   at 
> org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.sendRequest(FlinkDistribution.java:601)
> Jul 03 03:14:31   at 
> org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.getOperationResult(FlinkDistribution.java:570)
> Jul 03 03:14:31   at 
> org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.executeStatementWithResult(FlinkDistribution.java:590)
> Jul 03 03:14:31   ... 7 more
> Jul 03 03:14:31 
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60620&view=logs&j=e8e46ef5-75cc-564f-c2bd-1797c35cbebe&t=60c49903-2505-5c25-7e46-de91b1737bea&l=14602



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35754) SqlGatewayE2ECase.testMaterializedTableInFullMode failed due to Internal Server Error

2024-07-11 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865115#comment-17865115
 ] 

Piotr Nowojski edited comment on FLINK-35754 at 7/11/24 3:08 PM:
-

[~hackergin], can you elaborate what was the problem here? Is this the same 
problem as: https://issues.apache.org/jira/browse/FLINK-35821 ?


was (Author: pnowojski):
[~hackergin], can you elaborate what's the problem? Is this the same problem 
as: https://issues.apache.org/jira/browse/FLINK-35821 ?

> SqlGatewayE2ECase.testMaterializedTableInFullMode failed due to Internal 
> Server Error
> -
>
> Key: FLINK-35754
> URL: https://issues.apache.org/jira/browse/FLINK-35754
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2024-07-05-11-15-52-731.png
>
>
> {code:java}
> Jul 03 03:14:31 java.lang.RuntimeException: Execute statement failed
> Jul 03 03:14:31   at 
> org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.executeStatementWithResult(FlinkDistribution.java:592)
> Jul 03 03:14:31   at 
> org.apache.flink.table.gateway.SqlGatewayE2ECase.lambda$testMaterializedTableInFullMode$3(SqlGatewayE2ECase.java:353)
> Jul 03 03:14:31   at 
> org.apache.flink.core.testutils.CommonTestUtils.waitUtil(CommonTestUtils.java:210)
> Jul 03 03:14:31   at 
> org.apache.flink.table.gateway.SqlGatewayE2ECase.testMaterializedTableInFullMode(SqlGatewayE2ECase.java:350)
> Jul 03 03:14:31   at 
> java.base/java.lang.reflect.Method.invoke(Method.java:568)
> Jul 03 03:14:31   at 
> org.apache.flink.util.ExternalResource$1.evaluate(ExternalResource.java:48)
> Jul 03 03:14:31   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> Jul 03 03:14:31   at 
> org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:29)
> Jul 03 03:14:31 Caused by: java.lang.RuntimeException: The rest request is 
> not successful: Internal Server Error
> Jul 03 03:14:31   at 
> org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.sendRequest(FlinkDistribution.java:601)
> Jul 03 03:14:31   at 
> org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.getOperationResult(FlinkDistribution.java:570)
> Jul 03 03:14:31   at 
> org.apache.flink.tests.util.flink.FlinkDistribution$TestSqlGatewayRestClient.executeStatementWithResult(FlinkDistribution.java:590)
> Jul 03 03:14:31   ... 7 more
> Jul 03 03:14:31 
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60620&view=logs&j=e8e46ef5-75cc-564f-c2bd-1797c35cbebe&t=60c49903-2505-5c25-7e46-de91b1737bea&l=14602



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35821) ResumeCheckpointManuallyITCase failed with File X does not exist or the user running Flink C has insufficient permissions to access it

2024-07-12 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865353#comment-17865353
 ] 

Piotr Nowojski commented on FLINK-35821:


Thanks for the update. After rebasing azure is green so let's close this one.

> ResumeCheckpointManuallyITCase failed with File X does not exist or the user 
> running Flink C has insufficient permissions to access it
> --
>
> Key: FLINK-35821
> URL: https://issues.apache.org/jira/browse/FLINK-35821
> Project: Flink
>  Issue Type: Bug
>  Components: Test Infrastructure
>Affects Versions: 2.0.0, 1.20.0
>Reporter: Piotr Nowojski
>Priority: Major
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60857&view=logs&j=5c8e7682-d68f-54d1-16a2-a09310218a49&t=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba
> primary failure:
> {noformat}
> Caused by: java.io.FileNotFoundException: File 
> file:/tmp/junit5368809541399217009/junit9107863722486384012/5a045e6c0cd0297faf5a2bf6fff27465/shared/job_5a045e6c0cd0297faf5a2bf6fff27465_op_90bea66de1c231edf33913ecd54406c1_1_2/0effb888-aa59-4bc4-b3e6-02622c831863
>  does not exist or the user running Flink ('agent01_azpcontainer') has 
> insufficient permissions to access it.
> {noformat}
> Full stack trace
> {noformat}
> 2024-07-11T13:49:46.4137693Z Jul 11 13:49:46 13:49:46.412 [ERROR] Tests run: 
> 48, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 309.7 s <<< FAILURE! -- 
> in org.apache.flink.test.checkpointing.ResumeCheckpointManuallyITCase
> 2024-07-11T13:49:46.4139710Z Jul 11 13:49:46 13:49:46.412 [ERROR] 
> org.apache.flink.test.checkpointing.ResumeCheckpointManuallyITCase.testExternalizedIncrementalRocksDBCheckpointsWithLocalRecoveryZookeeper[RestoreMode
>  = CLAIM] -- Time elapsed: 2.722 s <<< ERROR!
> 2024-07-11T13:49:46.4140928Z Jul 11 13:49:46 
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
> 2024-07-11T13:49:46.4142766Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:219)
> 2024-07-11T13:49:46.4144185Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailureAndReport(ExecutionFailureHandler.java:166)
> 2024-07-11T13:49:46.4145249Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:121)
> 2024-07-11T13:49:46.4146510Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:281)
> 2024-07-11T13:49:46.4147599Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:272)
> 2024-07-11T13:49:46.4148975Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:265)
> 2024-07-11T13:49:46.4150467Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:800)
> 2024-07-11T13:49:46.4151977Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:777)
> 2024-07-11T13:49:46.4153308Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
> 2024-07-11T13:49:46.4154713Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:515)
> 2024-07-11T13:49:46.4155416Z Jul 11 13:49:46  at 
> sun.reflect.GeneratedMethodAccessor29.invoke(Unknown Source)
> 2024-07-11T13:49:46.4156342Z Jul 11 13:49:46  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2024-07-11T13:49:46.4157291Z Jul 11 13:49:46  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2024-07-11T13:49:46.4158065Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318)
> 2024-07-11T13:49:46.4159387Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
> 2024-07-11T13:49:46.4160469Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316)
> 2024-07-11T13:49:46.4161819Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229)
> 2024-07-11T13:49:46.4163253Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88)
> 2024-07-11T13:49:46.4164717Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.rpc

[jira] [Closed] (FLINK-35821) ResumeCheckpointManuallyITCase failed with File X does not exist or the user running Flink C has insufficient permissions to access it

2024-07-12 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-35821.
--
Resolution: Duplicate

> ResumeCheckpointManuallyITCase failed with File X does not exist or the user 
> running Flink C has insufficient permissions to access it
> --
>
> Key: FLINK-35821
> URL: https://issues.apache.org/jira/browse/FLINK-35821
> Project: Flink
>  Issue Type: Bug
>  Components: Test Infrastructure
>Affects Versions: 2.0.0, 1.20.0
>Reporter: Piotr Nowojski
>Priority: Major
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60857&view=logs&j=5c8e7682-d68f-54d1-16a2-a09310218a49&t=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba
> primary failure:
> {noformat}
> Caused by: java.io.FileNotFoundException: File 
> file:/tmp/junit5368809541399217009/junit9107863722486384012/5a045e6c0cd0297faf5a2bf6fff27465/shared/job_5a045e6c0cd0297faf5a2bf6fff27465_op_90bea66de1c231edf33913ecd54406c1_1_2/0effb888-aa59-4bc4-b3e6-02622c831863
>  does not exist or the user running Flink ('agent01_azpcontainer') has 
> insufficient permissions to access it.
> {noformat}
> Full stack trace
> {noformat}
> 2024-07-11T13:49:46.4137693Z Jul 11 13:49:46 13:49:46.412 [ERROR] Tests run: 
> 48, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 309.7 s <<< FAILURE! -- 
> in org.apache.flink.test.checkpointing.ResumeCheckpointManuallyITCase
> 2024-07-11T13:49:46.4139710Z Jul 11 13:49:46 13:49:46.412 [ERROR] 
> org.apache.flink.test.checkpointing.ResumeCheckpointManuallyITCase.testExternalizedIncrementalRocksDBCheckpointsWithLocalRecoveryZookeeper[RestoreMode
>  = CLAIM] -- Time elapsed: 2.722 s <<< ERROR!
> 2024-07-11T13:49:46.4140928Z Jul 11 13:49:46 
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
> 2024-07-11T13:49:46.4142766Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:219)
> 2024-07-11T13:49:46.4144185Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailureAndReport(ExecutionFailureHandler.java:166)
> 2024-07-11T13:49:46.4145249Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:121)
> 2024-07-11T13:49:46.4146510Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:281)
> 2024-07-11T13:49:46.4147599Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:272)
> 2024-07-11T13:49:46.4148975Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:265)
> 2024-07-11T13:49:46.4150467Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:800)
> 2024-07-11T13:49:46.4151977Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:777)
> 2024-07-11T13:49:46.4153308Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
> 2024-07-11T13:49:46.4154713Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:515)
> 2024-07-11T13:49:46.4155416Z Jul 11 13:49:46  at 
> sun.reflect.GeneratedMethodAccessor29.invoke(Unknown Source)
> 2024-07-11T13:49:46.4156342Z Jul 11 13:49:46  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2024-07-11T13:49:46.4157291Z Jul 11 13:49:46  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2024-07-11T13:49:46.4158065Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318)
> 2024-07-11T13:49:46.4159387Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
> 2024-07-11T13:49:46.4160469Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316)
> 2024-07-11T13:49:46.4161819Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229)
> 2024-07-11T13:49:46.4163253Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88)
> 2024-07-11T13:49:46.4164717Z Jul 11 13:49:46  at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174)
> 2024-07-11T13:49:46.4165948Z Jul 11 13:49:46  at 
>

[jira] [Closed] (FLINK-35768) Use native file copy in RocksDBStateDownloader

2024-07-12 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-35768.
--
Fix Version/s: 2.0.0
   1.20.0
   Resolution: Fixed

Merged to master as 957804a9591..a056e0230e8

> Use native file copy in RocksDBStateDownloader
> --
>
> Key: FLINK-35768
> URL: https://issues.apache.org/jira/browse/FLINK-35768
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0, 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35767) Provide native file copy support for S3 using s5cmd

2024-07-12 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-35767.
--
Fix Version/s: 2.0.0
   1.20.0
   Resolution: Fixed

Merged to master as 5c4b1aa2bd9 1da48e53fa7 

> Provide native file copy support for S3 using s5cmd
> ---
>
> Key: FLINK-35767
> URL: https://issues.apache.org/jira/browse/FLINK-35767
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / FileSystem
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
> Fix For: 2.0.0, 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35860) S5CmdOnMinioITCase failed due to IllegalAccessError in JDK17/21

2024-07-18 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-35860:
--

Assignee: Piotr Nowojski

> S5CmdOnMinioITCase failed due to IllegalAccessError in JDK17/21
> ---
>
> Key: FLINK-35860
> URL: https://issues.apache.org/jira/browse/FLINK-35860
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 2.0.0
>Reporter: Weijie Guo
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> {code:java}
> Jul 16 01:31:05 Caused by: java.lang.IllegalAccessError: failed to access 
> class org.apache.flink.fs.s3.common.S5CmdOnMinioITCase$Record from class 
> org.apache.flink.fs.s3.common.S5CmdOnMinioITCase$RecordConstructorAccess 
> (org.apache.flink.fs.s3.common.S5CmdOnMinioITCase$Record is in unnamed module 
> of loader 'app'; 
> org.apache.flink.fs.s3.common.S5CmdOnMinioITCase$RecordConstructorAccess is 
> in unnamed module of loader com.esotericsoftware.reflectasm.AccessClassLoader 
> @76f35a1b)
> Jul 16 01:31:05   at 
> org.apache.flink.fs.s3.common.S5CmdOnMinioITCase$RecordConstructorAccess.newInstance(Unknown
>  Source)
> Jul 16 01:31:05   at 
> com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy$1.newInstance(Kryo.java:1193)
> Jul 16 01:31:05   at 
> com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
> Jul 16 01:31:05   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
> Jul 16 01:31:05   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
> Jul 16 01:31:05   at 
> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> Jul 16 01:31:05   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:394)
> Jul 16 01:31:05   at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:206)
> Jul 16 01:31:05   at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:44)
> Jul 16 01:31:05   at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
> Jul 16 01:31:05   at 
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
> Jul 16 01:31:05   at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:128)
> Jul 16 01:31:05   at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:103)
> Jul 16 01:31:05   at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93)
> Jul 16 01:31:05   at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:103)
> Jul 16 01:31:05   at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> Jul 16 01:31:05   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
> Jul 16 01:31:05   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> Jul 16 01:31:05   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
> Jul 16 01:31:05   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
> Jul 16 01:31:05   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
> Jul 16 01:31:05   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
> Jul 16 01:31:05   at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
> Jul 16 01:31:05   at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> Jul 16 01:31:05   at java.base/java.lang.Thread.run(Thread.java:833)
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60941&view=logs&j=54dde87f-1e30-5a5e-7727-3286687a13f7&t=4653b7d3-60bf-5409-5154-679ed6d8688e&l=13718



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   4   5   6   7   8   9   10   >