[jira] [Created] (FLINK-33486) Pulsar Client Send Timeout Terminates TaskManager

2023-11-08 Thread Jason Kania (Jira)
Jason Kania created FLINK-33486:
---

 Summary: Pulsar Client Send Timeout Terminates TaskManager
 Key: FLINK-33486
 URL: https://issues.apache.org/jira/browse/FLINK-33486
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Pulsar
Affects Versions: 1.17.1
Reporter: Jason Kania


Currently, when the Pulsar Producer encounters a timeout when attempting to 
send data, it generates an unhandled TimeoutException. This is not a reasonable 
way to handle the timeout. The situation should be handled in a graceful way 
either through additional parameters that put control of the action under the 
discretion of the user or through some callback mechanism that the user can 
work with to write code. Unfortunately, fight now, this causes a termination of 
the task manager which then leads to other issues.

Increasing the timeout period to avoid the issue is not really an option to 
ensure proper handling in the event that the situation does occur.

The exception is as follows:

org.apache.flink.util.FlinkRuntimeException: Failed to send data to Pulsar: 
persistent://public/default/myproducer-partition-0
        at 
org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.throwSendingException(PulsarWriter.java:182)
 ~[flink-connector-pulsar-4.0.0-1.17.jar:4.0.0-1.17]
        at 
org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.lambda$write$0(PulsarWriter.java:172)
 ~[flink-connector-pulsar-4.0.0-1.17.jar:4.0.0-1.17]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) 
~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) 
[flink-dist-1.17.1.jar:1.17.1]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) 
[flink-dist-1.17.1.jar:1.17.1]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
[flink-dist-1.17.1.jar:1.17.1]
        at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: org.apache.pulsar.client.api.PulsarClientException$TimeoutException: 
The producer myproducer- f4b1580b-1ea8-4c21-9d0b-da4d12ca6f93 can not send 
message to the topic persistent://public/default/myproducer-partition-0 within 
given timeout
        at 
org.apache.pulsar.client.impl.ProducerImpl.run(ProducerImpl.java:1993) 
~[pulsar-client-all-2.11.2.jar:2.11.2]
        at 
org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.run(HashedWheelTimer.java:715)
 ~[pulsar-client-all-2.11.2.jar:2.11.2]
        at 
org.apache.pulsar.shade.io.netty.util.concurrent.ImmediateExecutor.execute(ImmediateExecutor.java:34)
 ~[pulsar-client-all-2.11.2.jar:2.11.2]
        at 
org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:703)
 ~[pulsar-client-all-2.11.2.jar:2.11.2]
        at 
org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:790)
 ~[pulsar-client-all-2.11.2.jar:2.11.2]
        at 
org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:503)
 ~[pulsar-client-all-2.11.2.jar:2.11.2]
        at 
org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 ~[pulsar-client-all-2.11.2.jar:2.11.2]
        ... 1 more

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33135) Flink Pulsar Connector Attempts Partitioned Routing on Unpartitioned Topic

2023-09-23 Thread Jason Kania (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Kania updated FLINK-33135:

Description: 
For a non-partitioned topic, the Flink Pulsar connector creates a TopicRouter, 
ensures that the list of partitions is empty during deployment and then 
complains when the list of partitions supplied to it is empty at runtime. The 
default TopicRouter that is created is the RoundRobinTopicRouter and it 
provides a nonsensical error for this type of TopicRouter. This error message 
issue is raised in ticket https://issues.apache.org/jira/browse/FLINK-33136.

The connector should not be applying a topic router to nonpartitioned topics or 
should treat the nonpartitioned topic as a special/different case. Currently, 
the following error is raised even though the setTopics() method is called on 
the PulsarSink.builder() with a single topic.

Caused by: java.lang.IllegalArgumentException: You should provide topics for 
routing topic by message key hash.
        at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
        at 
org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56)
        at 
org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147)
        at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
        ... 30 more

The distinctTopics() method of the TopicNameUtils class is what ensures the 
list of partitions is empty for a nonpartitioned topic.

  was:
For a non-partitioned topic, the Flink Pulsar connector creates a TopicRouter, 
ensures that the list of partitions is empty during deployment and then 
complains when the list of partitions supplied to it is empty at runtime. The 
default TopicRouter that is created is the RoundRobinTopicRouter and it 
provides a nonsensical error for this type of TopicRouter. This error message 
issue is raised in ticket https://issues.apache.org/jira/browse/FLINK-33136.

The connector should not be applying a topic router to nonpartitioned topics or 
should treat the nonpartitioned topic as a special/different case. Currently, 
the following error is raised even though the setTopics() method is called on 
the PulsarSink.builder() with a single topic.

Caused by: java.lang.IllegalArgumentException: You should provide topics for 
routing topic by message key hash.
        at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
        at 
org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56)
        at 
org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147)
        at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
        ... 30 more

The distinctTopics() method of the TopicNameUtils class is what ensures the 
list of partitions is empty.


> Flink Pulsar Connector Attempts Partitioned Routing on Unpartitioned Topic
> --
>
> Key: FLINK-33135
> URL: https://issues.apache.org/jira/browse/FLINK-33135
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.17.1
>Reporter: Jason Kania
>Priority: Major
>
> For a non-partitioned topic, the Flink Pulsar connector creates a 
> TopicRouter, ensures that the list of partitions is empty during deployment 
> and then complains when the list of partitions supplied to it is empty at 
> runtime. The default TopicRouter that is created is the RoundRobinTopicRouter 
> and it provides a nonsensical error for this type of TopicRouter. This error 
> message issue is raised in ticket 
> https://issues.apache.org/jira/browse/FLINK-33136.
> The connector should not be applying a topic router to nonpartitioned topics 
> or should treat the nonpartitioned topic as a special/different case. 
> Currently, the following error is raised even though the setTopics() method 
> is called on the PulsarSink.builder() with a single topic.
> Caused by: java.lang.IllegalArgumentException: You should provide topics for 
> routing topic by message key hash.
>         at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
>         at 
> org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56)
>         at 
> 

[jira] [Updated] (FLINK-33135) Flink Pulsar Connector Attempts Partitioned Routing on Unpartitioned Topic

2023-09-23 Thread Jason Kania (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Kania updated FLINK-33135:

Description: 
For a non-partitioned topic, the Flink Pulsar connector creates a TopicRouter, 
ensures that the list of partitions is empty during deployment and then 
complains when the list of partitions supplied to it is empty at runtime. The 
default TopicRouter that is created is the RoundRobinTopicRouter and it 
provides a nonsensical error for this type of TopicRouter. This error message 
issue is raised in ticket https://issues.apache.org/jira/browse/FLINK-33136.

The connector should not be applying a topic router to nonpartitioned topics or 
should treat the nonpartitioned topic as a special/different case. Currently, 
the following error is raised even though the setTopics() method is called on 
the PulsarSink.builder() with a single topic.

Caused by: java.lang.IllegalArgumentException: You should provide topics for 
routing topic by message key hash.
        at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
        at 
org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56)
        at 
org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147)
        at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
        ... 30 more

The distinctTopics() method of the TopicNameUtils class is what ensures the 
list of partitions is empty.

  was:
For a non-partitioned topic, the Flink Pulsar connector creates a TopicRouter, 
ensures that the list of partitions is empty during deployment and then 
complains when the list of partitions supplied to it is empty at runtime. The 
default TopicRouter that is created is the RoundRobinTopicRouter and it 
provides a nonsensical error for this type of TopicRouter. This error message 
issue is raised in ticket https://issues.apache.org/jira/browse/FLINK-33136.

The connector should not be applying a topic router to nonpartitioned topics or 
should treat the nonpartitioned topic as a special case. Currently, the 
following error is raised even though the setTopics() method is called on the 
PulsarSink.builder() with a single topic.

Caused by: java.lang.IllegalArgumentException: You should provide topics for 
routing topic by message key hash.
        at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
        at 
org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56)
        at 
org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147)
        at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
        ... 30 more

The distinctTopics() method of the TopicNameUtils class is what ensures the 
list of partitions is empty.


> Flink Pulsar Connector Attempts Partitioned Routing on Unpartitioned Topic
> --
>
> Key: FLINK-33135
> URL: https://issues.apache.org/jira/browse/FLINK-33135
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.17.1
>Reporter: Jason Kania
>Priority: Major
>
> For a non-partitioned topic, the Flink Pulsar connector creates a 
> TopicRouter, ensures that the list of partitions is empty during deployment 
> and then complains when the list of partitions supplied to it is empty at 
> runtime. The default TopicRouter that is created is the RoundRobinTopicRouter 
> and it provides a nonsensical error for this type of TopicRouter. This error 
> message issue is raised in ticket 
> https://issues.apache.org/jira/browse/FLINK-33136.
> The connector should not be applying a topic router to nonpartitioned topics 
> or should treat the nonpartitioned topic as a special/different case. 
> Currently, the following error is raised even though the setTopics() method 
> is called on the PulsarSink.builder() with a single topic.
> Caused by: java.lang.IllegalArgumentException: You should provide topics for 
> routing topic by message key hash.
>         at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
>         at 
> org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56)
>         at 
> org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147)
>         at 
> 

[jira] [Updated] (FLINK-33135) Flink Pulsar Connector Attempts Partitioned Routing on Unpartitioned Topic

2023-09-23 Thread Jason Kania (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Kania updated FLINK-33135:

Description: 
For a non-partitioned topic, the Flink Pulsar connector creates a TopicRouter, 
ensures that the list of partitions is empty during deployment and then 
complains when the list of partitions supplied to it is empty at runtime. The 
default TopicRouter that is created is the RoundRobinTopicRouter and it 
provides a nonsensical error for this type of TopicRouter. This error message 
issue is raised in ticket https://issues.apache.org/jira/browse/FLINK-33136.

The connector should not be applying a topic router to nonpartitioned topics or 
should treat the nonpartitioned topic as a special case. Currently, the 
following error is raised even though the setTopics() method is called on the 
PulsarSink.builder() with a single topic.

Caused by: java.lang.IllegalArgumentException: You should provide topics for 
routing topic by message key hash.
        at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
        at 
org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56)
        at 
org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147)
        at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
        ... 30 more

The distinctTopics() method of the TopicNameUtils class is what ensures the 
list of partitions is empty.

  was:
For a non-partitioned topic, the Flink Pulsar connector creates a TopicRouter, 
ensures that the list of partitions is empty during deployment and then 
complains when the list of partitions supplied to it is empty at runtime. The 
default TopicRouter that is created is the RoundRobinTopicRouter and it 
provides a nonsensical error for this type of TopicRouter. This error message 
issue is raised in ticket https://issues.apache.org/jira/browse/FLINK-33136.

The connector should not be applying a topic router to nonpartitioned topics or 
should treat the nonpartitioned topic as a special case. Currently, the 
following error is raised even though the setTopics() method is called on the 
PulsarSink.builder() with a single topic.

Caused by: java.lang.IllegalArgumentException: You should provide topics for 
routing topic by message key hash.
        at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
        at 
org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56)
        at 
org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147)
        at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
        ... 30 more

 

 


> Flink Pulsar Connector Attempts Partitioned Routing on Unpartitioned Topic
> --
>
> Key: FLINK-33135
> URL: https://issues.apache.org/jira/browse/FLINK-33135
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.17.1
>Reporter: Jason Kania
>Priority: Major
>
> For a non-partitioned topic, the Flink Pulsar connector creates a 
> TopicRouter, ensures that the list of partitions is empty during deployment 
> and then complains when the list of partitions supplied to it is empty at 
> runtime. The default TopicRouter that is created is the RoundRobinTopicRouter 
> and it provides a nonsensical error for this type of TopicRouter. This error 
> message issue is raised in ticket 
> https://issues.apache.org/jira/browse/FLINK-33136.
> The connector should not be applying a topic router to nonpartitioned topics 
> or should treat the nonpartitioned topic as a special case. Currently, the 
> following error is raised even though the setTopics() method is called on the 
> PulsarSink.builder() with a single topic.
> Caused by: java.lang.IllegalArgumentException: You should provide topics for 
> routing topic by message key hash.
>         at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
>         at 
> org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56)
>         at 
> org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147)
>         at 
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
>         at 
> 

[jira] [Updated] (FLINK-33135) Flink Pulsar Connector Attempts Partitioned Routing on Unpartitioned Topic

2023-09-23 Thread Jason Kania (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Kania updated FLINK-33135:

Description: 
For a non-partitioned topic, the Flink Pulsar connector creates a TopicRouter, 
ensures that the list of partitions is empty during deployment and then 
complains when the list of partitions supplied to it is empty at runtime. The 
default TopicRouter that is created is the RoundRobinTopicRouter and it 
provides a nonsensical error for this type of TopicRouter. This error message 
issue is raised in ticket https://issues.apache.org/jira/browse/FLINK-33136.

The connector should not be applying a topic router to nonpartitioned topics or 
should treat the nonpartitioned topic as a special case. Currently, the 
following error is raised even though the setTopics() method is called on the 
PulsarSink.builder() with a single topic.

Caused by: java.lang.IllegalArgumentException: You should provide topics for 
routing topic by message key hash.
        at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
        at 
org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56)
        at 
org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147)
        at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
        ... 30 more

 

 

  was:
For a non-partitioned topic, the Flink Pulsar connector creates a TopicRouter, 
ensures that the list of partitions is empty during deployment and then 
complains when the list of partitions supplied to it is empty at runtime. The 
default TopicRouter that is created is the RoundRobinTopicRouter and it 
provides a nonsensical error for this type of TopicRouter. This error message 
issue is raised in ticket ???.

The connector should not be applying a topic router to nonpartitioned topics or 
should treat the nonpartitioned topic as a special case. Currently, the 
following error is raised even though the setTopics() method is called on the 
PulsarSink.builder() with a single topic.

Caused by: java.lang.IllegalArgumentException: You should provide topics for 
routing topic by message key hash.
        at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
        at 
org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56)
        at 
org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147)
        at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
        ... 30 more

 

 


> Flink Pulsar Connector Attempts Partitioned Routing on Unpartitioned Topic
> --
>
> Key: FLINK-33135
> URL: https://issues.apache.org/jira/browse/FLINK-33135
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.17.1
>Reporter: Jason Kania
>Priority: Major
>
> For a non-partitioned topic, the Flink Pulsar connector creates a 
> TopicRouter, ensures that the list of partitions is empty during deployment 
> and then complains when the list of partitions supplied to it is empty at 
> runtime. The default TopicRouter that is created is the RoundRobinTopicRouter 
> and it provides a nonsensical error for this type of TopicRouter. This error 
> message issue is raised in ticket 
> https://issues.apache.org/jira/browse/FLINK-33136.
> The connector should not be applying a topic router to nonpartitioned topics 
> or should treat the nonpartitioned topic as a special case. Currently, the 
> following error is raised even though the setTopics() method is called on the 
> PulsarSink.builder() with a single topic.
> Caused by: java.lang.IllegalArgumentException: You should provide topics for 
> routing topic by message key hash.
>         at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
>         at 
> org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56)
>         at 
> org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147)
>         at 
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
>         at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
>         ... 30 more
>  
>  



--
This 

[jira] [Created] (FLINK-33136) Flink Pulsar Connector RoundRobinTopicRouter Generates Invalid Error Message

2023-09-23 Thread Jason Kania (Jira)
Jason Kania created FLINK-33136:
---

 Summary: Flink Pulsar Connector RoundRobinTopicRouter Generates 
Invalid Error Message
 Key: FLINK-33136
 URL: https://issues.apache.org/jira/browse/FLINK-33136
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Pulsar
Affects Versions: 1.17.1
Reporter: Jason Kania


The RoundRobinTopicRouter class generates the runtime error "You should provide 
topics for routing topic by message key hash." when no partitions are set. This 
error is a direct copy of the error in the KeyHashTopicRouter but is 
nonsensical to a RoundRobinTopicRouter since hashing is not involved in route 
selection.

More importantly however, this error should be detected at deploy time when the 
PulsarSink is built with the builder since the list of topics is supplied via 
the setTopics() method of the builder.

Additionally, the wording of the error is not clear in any case and could be 
improved to something like: "No partition routing topics were provided to allow 
for topic routing"

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33135) Flink Pulsar Connector Attempts Partitioned Routing on Unpartitioned Topic

2023-09-23 Thread Jason Kania (Jira)
Jason Kania created FLINK-33135:
---

 Summary: Flink Pulsar Connector Attempts Partitioned Routing on 
Unpartitioned Topic
 Key: FLINK-33135
 URL: https://issues.apache.org/jira/browse/FLINK-33135
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Pulsar
Affects Versions: 1.17.1
Reporter: Jason Kania


For a non-partitioned topic, the Flink Pulsar connector creates a TopicRouter, 
ensures that the list of partitions is empty during deployment and then 
complains when the list of partitions supplied to it is empty at runtime. The 
default TopicRouter that is created is the RoundRobinTopicRouter and it 
provides a nonsensical error for this type of TopicRouter. This error message 
issue is raised in ticket ???.

The connector should not be applying a topic router to nonpartitioned topics or 
should treat the nonpartitioned topic as a special case. Currently, the 
following error is raised even though the setTopics() method is called on the 
PulsarSink.builder() with a single topic.

Caused by: java.lang.IllegalArgumentException: You should provide topics for 
routing topic by message key hash.
        at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
        at 
org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56)
        at 
org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147)
        at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
        ... 30 more

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33111) Flink Pulsar Connector to Pulsar Client Version Mismatch

2023-09-18 Thread Jason Kania (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Kania updated FLINK-33111:

Description: 
In the documentation for the Flink Pulsar Connector, 
([https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/pulsar/])
 it indicates that 2.10.0 and above versions of the pulsar client are supported 
"You can use the connector with the Pulsar 2.10.0 or higher" and the pom file 
entry references the 4.0.0-1.17 version of the connector which points to the 
2.11.0 version of the Pulsar client. However, when using Pulsar Client 2.10.4 
or 2.10.5, the following error is generated:
 
java.lang.NoSuchMethodError: 'org.apache.pulsar.client.api.ClientBuilder 
org.apache.pulsar.client.api.ClientBuilder.connectionMaxIdleSeconds(int)'
    at 
org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient(PulsarClientFactory.java:127)
    at 
org.apache.flink.connector.pulsar.source.reader.PulsarSourceReader.create(PulsarSourceReader.java:266)
    at 
org.apache.flink.connector.pulsar.source.PulsarSource.createReader(PulsarSource.java:137)
    at 
org.apache.flink.streaming.api.operators.SourceOperator.initReader(SourceOperator.java:312)
    at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init(SourceOperatorStreamTask.java:93)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:699)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
    at java.base/java.lang.Thread.run(Thread.java:829)
 
The referenced method 'connectionMaxIdleSeconds' is only available in the 
Pulsar 2.11 client when looking at the source code. I am not sure whether the 
documentation is wrong and the Flink Pulsar Connector 2.11 is the intended 
Pulsar version. However, my understanding is that Pulsar 2.11 is targeted 
toward java 17. This would create the need for mixed Java 11 and Java 17 
deployment unless the Pulsar client code is compiled for 2.11.
 
Documentation cleanup and a reference to the appropriate Java versions is 
needed. A fix to the 1.17.1 Flink pulsar connector may alternatively be 
required.

  was:
In the documentation for the Flink Pulsar Connector, 
(https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/pulsar/)
 it indicates that 2.10.0 and above versions of the pulsar client are supported 
"You can use the connector with the Pulsar 2.10.0 or higher" and the pom file 
entry references the 4.0.0-1.17 version of the connector which points to the 
2.11.0 version of the Pulsar client. However, when using Pulsar Client 2.10.4 
or 2.10.5, the following error is generated:
 
java.lang.NoSuchMethodError: 'org.apache.pulsar.client.api.ClientBuilder 
org.apache.pulsar.client.api.ClientBuilder.connectionMaxIdleSeconds(int)'
    at 
org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient(PulsarClientFactory.java:127)
    at 
org.apache.flink.connector.pulsar.source.reader.PulsarSourceReader.create(PulsarSourceReader.java:266)
    at 
org.apache.flink.connector.pulsar.source.PulsarSource.createReader(PulsarSource.java:137)
    at 
org.apache.flink.streaming.api.operators.SourceOperator.initReader(SourceOperator.java:312)
    at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init(SourceOperatorStreamTask.java:93)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:699)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
    at java.base/java.lang.Thread.run(Thread.java:829)
 
The referenced method 'connectionMaxIdleSeconds' is only available in the 
Pulsar 2.11 client when looking at the source code. I am not sure whether the 
documentation is wrong and the Flink Pulsar Connector 2.11 is the intended 
Pulsar version. However, my understanding is that Pulsar 2.11 is targeted 
toward java 17. In the maven repository and would create the need for mixed 
Java 11 and Java 17 deployment.
 
Documentation cleanup and a reference to the appropriate Java versions is 
needed. A fix to the 1.17.1 Flink pulsar connector may alternatively be 
required.


> Flink Pulsar Connector to Pulsar Client Version Mismatch
> 

[jira] [Created] (FLINK-33111) Flink Pulsar Connector to Pulsar Client Version Mismatch

2023-09-18 Thread Jason Kania (Jira)
Jason Kania created FLINK-33111:
---

 Summary: Flink Pulsar Connector to Pulsar Client Version Mismatch
 Key: FLINK-33111
 URL: https://issues.apache.org/jira/browse/FLINK-33111
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Pulsar
Affects Versions: 1.17.1
Reporter: Jason Kania


In the documentation for the Flink Pulsar Connector, 
(https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/pulsar/)
 it indicates that 2.10.0 and above versions of the pulsar client are supported 
"You can use the connector with the Pulsar 2.10.0 or higher" and the pom file 
entry references the 4.0.0-1.17 version of the connector which points to the 
2.11.0 version of the Pulsar client. However, when using Pulsar Client 2.10.4 
or 2.10.5, the following error is generated:
 
java.lang.NoSuchMethodError: 'org.apache.pulsar.client.api.ClientBuilder 
org.apache.pulsar.client.api.ClientBuilder.connectionMaxIdleSeconds(int)'
    at 
org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient(PulsarClientFactory.java:127)
    at 
org.apache.flink.connector.pulsar.source.reader.PulsarSourceReader.create(PulsarSourceReader.java:266)
    at 
org.apache.flink.connector.pulsar.source.PulsarSource.createReader(PulsarSource.java:137)
    at 
org.apache.flink.streaming.api.operators.SourceOperator.initReader(SourceOperator.java:312)
    at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init(SourceOperatorStreamTask.java:93)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:699)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
    at java.base/java.lang.Thread.run(Thread.java:829)
 
The referenced method 'connectionMaxIdleSeconds' is only available in the 
Pulsar 2.11 client when looking at the source code. I am not sure whether the 
documentation is wrong and the Flink Pulsar Connector 2.11 is the intended 
Pulsar version. However, my understanding is that Pulsar 2.11 is targeted 
toward java 17. In the maven repository and would create the need for mixed 
Java 11 and Java 17 deployment.
 
Documentation cleanup and a reference to the appropriate Java versions is 
needed. A fix to the 1.17.1 Flink pulsar connector may alternatively be 
required.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-24302) Direct buffer memory leak on Pulsar connector with Java 11

2022-11-11 Thread Jason Kania (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17632436#comment-17632436
 ] 

Jason Kania commented on FLINK-24302:
-

[~syhily] The Pulsar team is asking for the entire stack trace to be able to 
investigate the issue further 
(https://github.com/apache/pulsar/issues/17989#issuecomment-1275530307).

> Direct buffer memory leak on Pulsar connector with Java 11
> --
>
> Key: FLINK-24302
> URL: https://issues.apache.org/jira/browse/FLINK-24302
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Pulsar
>Affects Versions: 1.14.0
>Reporter: Yufan Sheng
>Priority: Major
>  Labels: test-stability
>
> Running the Pulsar connector with multiple split readers on Java 11 could 
> throw {{a java.lang.OutOfMemoryError exception}}.
> {code:java}
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector$RetryException:
>  Could not complete the operation. Number of retries has been exhausted. 
> Failed reason: java.lang.OutOfMemoryError: Direct buffer memory
>   at 
> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
>   at 
> java.base/java.util.concurrent.CompletableFuture$OrApply.tryFire(CompletableFuture.java:1503)
>   ... 42 more
> Caused by: 
> org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector$RetryException:
>  Could not complete the operation. Number of retries has been exhausted. 
> Failed reason: java.lang.OutOfMemoryError: Direct buffer memory
>   at 
> org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$retryOperation$4(AsyncHttpConnector.java:249)
>   ... 39 more
> Caused by: org.apache.pulsar.shade.io.netty.handler.codec.EncoderException: 
> java.lang.OutOfMemoryError: Direct buffer memory
>   at 
> org.apache.pulsar.shade.io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:104)
>   at 
> org.apache.pulsar.shade.io.netty.channel.CombinedChannelDuplexHandler.write(CombinedChannelDuplexHandler.java:346)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
>   at 
> org.apache.pulsar.shade.io.netty.handler.stream.ChunkedWriteHandler.doFlush(ChunkedWriteHandler.java:303)
>   at 
> org.apache.pulsar.shade.io.netty.handler.stream.ChunkedWriteHandler.flush(ChunkedWriteHandler.java:132)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
>   at 
> org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1020)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:311)
>   at 
> org.apache.pulsar.shade.org.asynchttpclient.netty.request.NettyRequestSender.writeRequest(NettyRequestSender.java:420)
>   ... 23 more
> {code}
> The reason is that under Java 11, the Netty will allocate memory from the 
> pool of Java Direct Memory and is affected by the MaxDirectMemory limit. 
> Under Java 8, it allocates native memory and is not affected by that setting.
> We have to reduce the direct memory usage by using a newer Pulsar client 
> which has a memory-limits configuration.
> This issue is addressed on Pulsar, and 
> [PIP-74|https://github.com/apache/pulsar/wiki/PIP-74%3A-Pulsar-client-memory-limits]
>  has been created for resolving this issue.
> We should keep this issue open with no resolved versions until Pulsar 
> provides a new client with memory limits.
> h2. Update: 2022/08/04
> The memory limit on consumer API has been released 
> https://github.com/apache/pulsar/pull/15216, we need to add 
> autoScaledReceiverQueueSizeEnabled option to enable 

[jira] [Commented] (FLINK-27611) ConcurrentModificationException during Flink-Pulsar checkpoint notification

2022-08-30 Thread Jason Kania (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17598045#comment-17598045
 ] 

Jason Kania commented on FLINK-27611:
-

Understood, unfortunately, it is a blocker for us, just not one for the larger 
Flink ecosystem.

> ConcurrentModificationException during Flink-Pulsar checkpoint notification
> ---
>
> Key: FLINK-27611
> URL: https://issues.apache.org/jira/browse/FLINK-27611
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.15.0
>Reporter: Jason Kania
>Assignee: Yufan Sheng
>Priority: Major
>  Labels: pull-request-available
>
> When attempting to run a job that was working in 1.12.7, but upgraded to 
> 1.15.0, the following exception is occurring outside of the control of my own 
> code:
>  
> java.util.ConcurrentModificationException
>     at 
> java.base/java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1208)
>     at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1244)
>     at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1239)
>     at 
> org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader.notifyCheckpointComplete(PulsarUnorderedSourceReader.java:129)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.notifyCheckpointComplete(SourceOperator.java:511)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104)
>     at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145)
>     at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:409)
>     at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:343)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1384)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$14(StreamTask.java:1325)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>     at java.base/java.lang.Thread.run(Thread.java:829)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27611) ConcurrentModificationException during Flink-Pulsar checkpoint notification

2022-08-18 Thread Jason Kania (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17581330#comment-17581330
 ] 

Jason Kania commented on FLINK-27611:
-

May I ask why this fix was not included in the 1.15.2 release? It really blocks 
usage of 1.15 for us.

> ConcurrentModificationException during Flink-Pulsar checkpoint notification
> ---
>
> Key: FLINK-27611
> URL: https://issues.apache.org/jira/browse/FLINK-27611
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.15.0
>Reporter: Jason Kania
>Priority: Major
>  Labels: pull-request-available
>
> When attempting to run a job that was working in 1.12.7, but upgraded to 
> 1.15.0, the following exception is occurring outside of the control of my own 
> code:
>  
> java.util.ConcurrentModificationException
>     at 
> java.base/java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1208)
>     at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1244)
>     at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1239)
>     at 
> org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader.notifyCheckpointComplete(PulsarUnorderedSourceReader.java:129)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.notifyCheckpointComplete(SourceOperator.java:511)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104)
>     at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145)
>     at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:409)
>     at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:343)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1384)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$14(StreamTask.java:1325)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>     at java.base/java.lang.Thread.run(Thread.java:829)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27611) ConcurrentModificationException during Flink-Pulsar checkpoint notification

2022-06-23 Thread Jason Kania (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17558211#comment-17558211
 ] 

Jason Kania commented on FLINK-27611:
-

Is there a possibility of this getting into the 1.15.1 patch release? I do know 
the process for inclusion but since this affects all available releases, it 
really blocks usage with Pulsar.

> ConcurrentModificationException during Flink-Pulsar checkpoint notification
> ---
>
> Key: FLINK-27611
> URL: https://issues.apache.org/jira/browse/FLINK-27611
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.15.0
>Reporter: Jason Kania
>Priority: Major
>  Labels: pull-request-available
>
> When attempting to run a job that was working in 1.12.7, but upgraded to 
> 1.15.0, the following exception is occurring outside of the control of my own 
> code:
>  
> java.util.ConcurrentModificationException
>     at 
> java.base/java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1208)
>     at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1244)
>     at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1239)
>     at 
> org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader.notifyCheckpointComplete(PulsarUnorderedSourceReader.java:129)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.notifyCheckpointComplete(SourceOperator.java:511)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104)
>     at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145)
>     at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:409)
>     at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:343)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1384)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$14(StreamTask.java:1325)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>     at java.base/java.lang.Thread.run(Thread.java:829)



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27611) ConcurrentModificationException during Flink-Pulsar checkpoint notification

2022-05-24 Thread Jason Kania (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541478#comment-17541478
 ] 

Jason Kania commented on FLINK-27611:
-

Does the most recent comment mean that it should go to the Pulsar project or is 
the issue with code in the Flink project?

> ConcurrentModificationException during Flink-Pulsar checkpoint notification
> ---
>
> Key: FLINK-27611
> URL: https://issues.apache.org/jira/browse/FLINK-27611
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.15.0
>Reporter: Jason Kania
>Priority: Major
>
> When attempting to run a job that was working in 1.12.7, but upgraded to 
> 1.15.0, the following exception is occurring outside of the control of my own 
> code:
>  
> java.util.ConcurrentModificationException
>     at 
> java.base/java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1208)
>     at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1244)
>     at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1239)
>     at 
> org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader.notifyCheckpointComplete(PulsarUnorderedSourceReader.java:129)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.notifyCheckpointComplete(SourceOperator.java:511)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104)
>     at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145)
>     at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:409)
>     at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:343)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1384)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$14(StreamTask.java:1325)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>     at java.base/java.lang.Thread.run(Thread.java:829)



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27611) ConcurrentModificationException during Flink-Pulsar checkpoint notification

2022-05-13 Thread Jason Kania (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Kania updated FLINK-27611:

Component/s: Connectors / Pulsar

> ConcurrentModificationException during Flink-Pulsar checkpoint notification
> ---
>
> Key: FLINK-27611
> URL: https://issues.apache.org/jira/browse/FLINK-27611
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.15.0
>Reporter: Jason Kania
>Priority: Major
>
> When attempting to run a job that was working in 1.12.7, but upgraded to 
> 1.15.0, the following exception is occurring outside of the control of my own 
> code:
>  
> java.util.ConcurrentModificationException
>     at 
> java.base/java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1208)
>     at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1244)
>     at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1239)
>     at 
> org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader.notifyCheckpointComplete(PulsarUnorderedSourceReader.java:129)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.notifyCheckpointComplete(SourceOperator.java:511)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104)
>     at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145)
>     at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:409)
>     at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:343)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1384)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$14(StreamTask.java:1325)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>     at java.base/java.lang.Thread.run(Thread.java:829)



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27611) ConcurrentModificationException during Flink-Pulsar checkpoint notification

2022-05-13 Thread Jason Kania (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Kania updated FLINK-27611:

Issue Type: Bug  (was: Improvement)

> ConcurrentModificationException during Flink-Pulsar checkpoint notification
> ---
>
> Key: FLINK-27611
> URL: https://issues.apache.org/jira/browse/FLINK-27611
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.15.0
>Reporter: Jason Kania
>Priority: Major
>
> When attempting to run a job that was working in 1.12.7, but upgraded to 
> 1.15.0, the following exception is occurring outside of the control of my own 
> code:
>  
> java.util.ConcurrentModificationException
>     at 
> java.base/java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1208)
>     at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1244)
>     at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1239)
>     at 
> org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader.notifyCheckpointComplete(PulsarUnorderedSourceReader.java:129)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.notifyCheckpointComplete(SourceOperator.java:511)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104)
>     at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145)
>     at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:409)
>     at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:343)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1384)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$14(StreamTask.java:1325)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>     at java.base/java.lang.Thread.run(Thread.java:829)



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27611) ConcurrentModificationException during Flink-Pulsar checkpoint notification

2022-05-13 Thread Jason Kania (Jira)
Jason Kania created FLINK-27611:
---

 Summary: ConcurrentModificationException during Flink-Pulsar 
checkpoint notification
 Key: FLINK-27611
 URL: https://issues.apache.org/jira/browse/FLINK-27611
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.15.0
Reporter: Jason Kania


When attempting to run a job that was working in 1.12.7, but upgraded to 
1.15.0, the following exception is occurring outside of the control of my own 
code:

 
java.util.ConcurrentModificationException
    at 
java.base/java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1208)
    at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1244)
    at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1239)
    at 
org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader.notifyCheckpointComplete(PulsarUnorderedSourceReader.java:129)
    at 
org.apache.flink.streaming.api.operators.SourceOperator.notifyCheckpointComplete(SourceOperator.java:511)
    at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104)
    at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145)
    at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:409)
    at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:343)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1384)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$14(StreamTask.java:1325)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    at java.base/java.lang.Thread.run(Thread.java:829)



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27568) Cannot submit JobGraph

2022-05-10 Thread Jason Kania (Jira)
Jason Kania created FLINK-27568:
---

 Summary: Cannot submit JobGraph
 Key: FLINK-27568
 URL: https://issues.apache.org/jira/browse/FLINK-27568
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.15.0
Reporter: Jason Kania


I recently upgraded from 1.12.7 to 1.15.0 and encountered an issue in 
submitting jobs via the command line. The following exception gets generated 
during submission. The jobs have been submitted in the past on the same machine 
in the 1.12.7 release without issue.

```org.apache.flink.util.FlinkException: Failed to execute job 'TestIngester'.
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2108)
        at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:188)
        at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:119)
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1969)
        at com.server.processing.TestIngesterJ.main(TestIngesterJ.java:133)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
        at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:836)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
        at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1078)
        at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1156)
        at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1156)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
        at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$11(RestClusterClient.java:434)
        at 
java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
        at 
java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
        at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
        at 
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
        at 
org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:373)
        at 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
        at 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
        at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
        at 
java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610)
        at 
java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085)
        at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: 
[org.apache.flink.runtime.rest.handler.RestHandlerException: Could not upload 
job files.
        at 
org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:201)
        at 
java.base/java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1236)
        at 
java.base/java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1205)
        at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
        at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705)
        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at 

[jira] [Commented] (FLINK-16468) BlobClient rapid retrieval retries on failure opens too many sockets

2020-11-06 Thread Jason Kania (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227405#comment-17227405
 ] 

Jason Kania commented on FLINK-16468:
-

Up until now, I have been redirected from these activities. The problem still 
exists in that rapid reconnections occur, but I have not had any chance to 
investigate and looks like I won't for a while. If you wish to close, feel free 
and I will reference this issue if I am able to open and reinvestigate.

> BlobClient rapid retrieval retries on failure opens too many sockets
> 
>
> Key: FLINK-16468
> URL: https://issues.apache.org/jira/browse/FLINK-16468
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.8.3, 1.9.2, 1.10.0
> Environment: Linux ubuntu servers running, patch current latest 
> Ubuntu patch current release java 8 JRE
>Reporter: Jason Kania
>Priority: Major
> Fix For: 1.12.0
>
>
> In situations where the BlobClient retrieval fails as in the following log, 
> rapid retries will exhaust the open sockets. All the retries happen within a 
> few milliseconds.
> {noformat}
> 2020-03-06 17:19:07,116 ERROR org.apache.flink.runtime.blob.BlobClient - 
> Failed to fetch BLOB 
> cddd17ef76291dd60eee9fd36085647a/p-bcd61652baba25d6863cf17843a2ef64f4c801d5-c1781532477cf65ff1c1e7d72dccabc7
>  from aaa-1/10.0.1.1:45145 and store it under 
> /tmp/blobStore-7328ed37-8bc7-4af7-a56c-474e264157c9/incoming/temp-0004 
> Retrying...
> {noformat}
> The above is output repeatedly until the following error occurs:
> {noformat}
> java.io.IOException: Could not connect to BlobServer at address 
> aaa-1/10.0.1.1:45145
>  at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:100)
>  at 
> org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:143)
>  at 
> org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181)
>  at 
> org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:202)
>  at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)
>  at 
> org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:915)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:595)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.net.SocketException: Too many open files
>  at java.net.Socket.createImpl(Socket.java:478)
>  at java.net.Socket.connect(Socket.java:605)
>  at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:95)
>  ... 8 more
> {noformat}
>  The retries should have some form of backoff in this situation to avoid 
> flooding the logs and exhausting other resources on the server.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16468) BlobClient rapid retrieval retries on failure opens too many sockets

2020-05-22 Thread Jason Kania (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17114190#comment-17114190
 ] 

Jason Kania commented on FLINK-16468:
-

Sorry [~gjy], not to this point. The current economic/health situation has 
resulted in a need to redirect our efforts for the moment. We have not done 
more testing in the short term.

> BlobClient rapid retrieval retries on failure opens too many sockets
> 
>
> Key: FLINK-16468
> URL: https://issues.apache.org/jira/browse/FLINK-16468
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.8.3, 1.9.2, 1.10.0
> Environment: Linux ubuntu servers running, patch current latest 
> Ubuntu patch current release java 8 JRE
>Reporter: Jason Kania
>Priority: Major
> Fix For: 1.11.0
>
>
> In situations where the BlobClient retrieval fails as in the following log, 
> rapid retries will exhaust the open sockets. All the retries happen within a 
> few milliseconds.
> {noformat}
> 2020-03-06 17:19:07,116 ERROR org.apache.flink.runtime.blob.BlobClient - 
> Failed to fetch BLOB 
> cddd17ef76291dd60eee9fd36085647a/p-bcd61652baba25d6863cf17843a2ef64f4c801d5-c1781532477cf65ff1c1e7d72dccabc7
>  from aaa-1/10.0.1.1:45145 and store it under 
> /tmp/blobStore-7328ed37-8bc7-4af7-a56c-474e264157c9/incoming/temp-0004 
> Retrying...
> {noformat}
> The above is output repeatedly until the following error occurs:
> {noformat}
> java.io.IOException: Could not connect to BlobServer at address 
> aaa-1/10.0.1.1:45145
>  at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:100)
>  at 
> org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:143)
>  at 
> org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181)
>  at 
> org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:202)
>  at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)
>  at 
> org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:915)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:595)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.net.SocketException: Too many open files
>  at java.net.Socket.createImpl(Socket.java:478)
>  at java.net.Socket.connect(Socket.java:605)
>  at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:95)
>  ... 8 more
> {noformat}
>  The retries should have some form of backoff in this situation to avoid 
> flooding the logs and exhausting other resources on the server.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16468) BlobClient rapid retrieval retries on failure opens too many sockets

2020-03-24 Thread Jason Kania (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17066231#comment-17066231
 ] 

Jason Kania commented on FLINK-16468:
-

[~gjy], the situation is unreliably unreliable, but I will try to collect some 
details over time. With Flink, Pulsar, Zookeeper and database connections in 
the mix, we get different errors/exceptions each time there is a network 
failure or we simulate one depending on which component is the first to 
encounter the networking issue and when different components attempt to recover.

> BlobClient rapid retrieval retries on failure opens too many sockets
> 
>
> Key: FLINK-16468
> URL: https://issues.apache.org/jira/browse/FLINK-16468
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.8.3, 1.9.2, 1.10.0
> Environment: Linux ubuntu servers running, patch current latest 
> Ubuntu patch current release java 8 JRE
>Reporter: Jason Kania
>Priority: Major
> Fix For: 1.11.0
>
>
> In situations where the BlobClient retrieval fails as in the following log, 
> rapid retries will exhaust the open sockets. All the retries happen within a 
> few milliseconds.
> {noformat}
> 2020-03-06 17:19:07,116 ERROR org.apache.flink.runtime.blob.BlobClient - 
> Failed to fetch BLOB 
> cddd17ef76291dd60eee9fd36085647a/p-bcd61652baba25d6863cf17843a2ef64f4c801d5-c1781532477cf65ff1c1e7d72dccabc7
>  from aaa-1/10.0.1.1:45145 and store it under 
> /tmp/blobStore-7328ed37-8bc7-4af7-a56c-474e264157c9/incoming/temp-0004 
> Retrying...
> {noformat}
> The above is output repeatedly until the following error occurs:
> {noformat}
> java.io.IOException: Could not connect to BlobServer at address 
> aaa-1/10.0.1.1:45145
>  at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:100)
>  at 
> org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:143)
>  at 
> org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181)
>  at 
> org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:202)
>  at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)
>  at 
> org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:915)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:595)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.net.SocketException: Too many open files
>  at java.net.Socket.createImpl(Socket.java:478)
>  at java.net.Socket.connect(Socket.java:605)
>  at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:95)
>  ... 8 more
> {noformat}
>  The retries should have some form of backoff in this situation to avoid 
> flooding the logs and exhausting other resources on the server.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16468) BlobClient rapid retrieval retries on failure opens too many sockets

2020-03-23 Thread Jason Kania (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17065174#comment-17065174
 ] 

Jason Kania commented on FLINK-16468:
-

[~gjy], In looking at my particular situation as a reference, the network 
outage seemed to be just less than one minute and that was more than enough to 
bring Flink, the running jobs and the related queuing applications to the point 
where they were not recoverable on their own after the network recovered. What 
I hope from a recovery strategy, and have implemented in the telecoms industry 
in the past, is that this recovery can happen on its own. The 1 second delay 
only seems to moderate the CPU utilization but not help with the applications 
giving up and being left in an unknown state.

Without some form of increasing backoff, you either need to have a large number 
of retries or expect the application to give up. Since the applications will 
take at least 10 seconds to restart, the different components bouncing at the 
same time in an outage such as this means there is just too much instability 
for all the components to recover.

That said, I understand what you mean about not having control over the 
libraries such as Curator, for example. Just today, I managed to create an 
unhandled exception where the zookeeper client gave up, leaving Flink in a 
funny state by playing with the network interface.

I also think that the 1.10 documentation is an improvement. The test will be 
once we migrate to using it.

Maybe having a pluggable restart strategy for all components could better allow 
users to handle the particulars of each installation?

> BlobClient rapid retrieval retries on failure opens too many sockets
> 
>
> Key: FLINK-16468
> URL: https://issues.apache.org/jira/browse/FLINK-16468
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.8.3, 1.9.2, 1.10.0
> Environment: Linux ubuntu servers running, patch current latest 
> Ubuntu patch current release java 8 JRE
>Reporter: Jason Kania
>Priority: Major
> Fix For: 1.11.0
>
>
> In situations where the BlobClient retrieval fails as in the following log, 
> rapid retries will exhaust the open sockets. All the retries happen within a 
> few milliseconds.
> {noformat}
> 2020-03-06 17:19:07,116 ERROR org.apache.flink.runtime.blob.BlobClient - 
> Failed to fetch BLOB 
> cddd17ef76291dd60eee9fd36085647a/p-bcd61652baba25d6863cf17843a2ef64f4c801d5-c1781532477cf65ff1c1e7d72dccabc7
>  from aaa-1/10.0.1.1:45145 and store it under 
> /tmp/blobStore-7328ed37-8bc7-4af7-a56c-474e264157c9/incoming/temp-0004 
> Retrying...
> {noformat}
> The above is output repeatedly until the following error occurs:
> {noformat}
> java.io.IOException: Could not connect to BlobServer at address 
> aaa-1/10.0.1.1:45145
>  at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:100)
>  at 
> org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:143)
>  at 
> org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181)
>  at 
> org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:202)
>  at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)
>  at 
> org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:915)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:595)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.net.SocketException: Too many open files
>  at java.net.Socket.createImpl(Socket.java:478)
>  at java.net.Socket.connect(Socket.java:605)
>  at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:95)
>  ... 8 more
> {noformat}
>  The retries should have some form of backoff in this situation to avoid 
> flooding the logs and exhausting other resources on the server.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16468) BlobClient rapid retrieval retries on failure opens too many sockets

2020-03-23 Thread Jason Kania (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17064899#comment-17064899
 ] 

Jason Kania commented on FLINK-16468:
-

[~gjy], to me, as a user, your restart strategy seems to lack a consolidate 
approach with the patchwork addition of individual small timeouts. I ask the 
open question of how many more of the small timeouts may have to be added for 
different components that exhibit the same instant restart approach in the face 
of external failures? I also question where users can expect to learn about 
each individual timer in detail and what each should be. In my opinion, users 
are left finding and tuning these values in the face of infrequent failures 
rather than having the option to strategically handle these situations 
proactively.

Because the remedy you suggest seems unlikely to be effective in my situation, 
I am not interested in contributing to this particular proposed solution.

> BlobClient rapid retrieval retries on failure opens too many sockets
> 
>
> Key: FLINK-16468
> URL: https://issues.apache.org/jira/browse/FLINK-16468
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.8.3, 1.9.2, 1.10.0
> Environment: Linux ubuntu servers running, patch current latest 
> Ubuntu patch current release java 8 JRE
>Reporter: Jason Kania
>Priority: Major
> Fix For: 1.11.0
>
>
> In situations where the BlobClient retrieval fails as in the following log, 
> rapid retries will exhaust the open sockets. All the retries happen within a 
> few milliseconds.
> {noformat}
> 2020-03-06 17:19:07,116 ERROR org.apache.flink.runtime.blob.BlobClient - 
> Failed to fetch BLOB 
> cddd17ef76291dd60eee9fd36085647a/p-bcd61652baba25d6863cf17843a2ef64f4c801d5-c1781532477cf65ff1c1e7d72dccabc7
>  from aaa-1/10.0.1.1:45145 and store it under 
> /tmp/blobStore-7328ed37-8bc7-4af7-a56c-474e264157c9/incoming/temp-0004 
> Retrying...
> {noformat}
> The above is output repeatedly until the following error occurs:
> {noformat}
> java.io.IOException: Could not connect to BlobServer at address 
> aaa-1/10.0.1.1:45145
>  at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:100)
>  at 
> org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:143)
>  at 
> org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181)
>  at 
> org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:202)
>  at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)
>  at 
> org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:915)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:595)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.net.SocketException: Too many open files
>  at java.net.Socket.createImpl(Socket.java:478)
>  at java.net.Socket.connect(Socket.java:605)
>  at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:95)
>  ... 8 more
> {noformat}
>  The retries should have some form of backoff in this situation to avoid 
> flooding the logs and exhausting other resources on the server.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16475) PulsarConsumerSource messageReceiveTimeoutMs parameter should be configurable

2020-03-20 Thread Jason Kania (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17063683#comment-17063683
 ] 

Jason Kania commented on FLINK-16475:
-

Sorry, was confused because Pulsar switched from tracking in JIRA to tracking 
in github. Have opened [https://github.com/apache/pulsar/issues/6578] for those 
who might encounter this in the future.

> PulsarConsumerSource messageReceiveTimeoutMs parameter should be configurable
> -
>
> Key: FLINK-16475
> URL: https://issues.apache.org/jira/browse/FLINK-16475
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.9.2
> Environment: Not releveant
>Reporter: Jason Kania
>Priority: Major
>
> The messageReceiveTimeoutMs value in the PulsarConsumerSource class is 
> hardcoded to 100ms but should be configurable to accommodate different 
> hardware setups such as developer labs where the performance level may not be 
> as critical.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16468) BlobClient rapid retrieval retries on failure opens too many sockets

2020-03-18 Thread Jason Kania (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17061785#comment-17061785
 ] 

Jason Kania commented on FLINK-16468:
-

[~gjy] I will happily update the user docs, but would appreciate some input on 
what the implications might be since my lack of experience on the implications 
was the part of the reason why this issue and 
https://issues.apache.org/jira/browse/FLINK-16470 were raised in the first 
place.

If you assign this to me, I can provide a backoff implementation. However, you 
mentioned a backoff time versus backoff algorithm as [~NicoK] mentioned and I 
was thinking. Given the option, I would go with a backoff algorithm going 
something like 1,2,4,8,16... seconds which provides both user feedback and some 
chance for network recovery.

If the BlobClient follows the same approach, then it too would use the 
'exponential' backoff as would all the components also currently tied to the 
restart delay.

> BlobClient rapid retrieval retries on failure opens too many sockets
> 
>
> Key: FLINK-16468
> URL: https://issues.apache.org/jira/browse/FLINK-16468
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.8.3, 1.9.2, 1.10.0
> Environment: Linux ubuntu servers running, patch current latest 
> Ubuntu patch current release java 8 JRE
>Reporter: Jason Kania
>Priority: Major
> Fix For: 1.11.0
>
>
> In situations where the BlobClient retrieval fails as in the following log, 
> rapid retries will exhaust the open sockets. All the retries happen within a 
> few milliseconds.
> {noformat}
> 2020-03-06 17:19:07,116 ERROR org.apache.flink.runtime.blob.BlobClient - 
> Failed to fetch BLOB 
> cddd17ef76291dd60eee9fd36085647a/p-bcd61652baba25d6863cf17843a2ef64f4c801d5-c1781532477cf65ff1c1e7d72dccabc7
>  from aaa-1/10.0.1.1:45145 and store it under 
> /tmp/blobStore-7328ed37-8bc7-4af7-a56c-474e264157c9/incoming/temp-0004 
> Retrying...
> {noformat}
> The above is output repeatedly until the following error occurs:
> {noformat}
> java.io.IOException: Could not connect to BlobServer at address 
> aaa-1/10.0.1.1:45145
>  at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:100)
>  at 
> org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:143)
>  at 
> org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181)
>  at 
> org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:202)
>  at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)
>  at 
> org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:915)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:595)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.net.SocketException: Too many open files
>  at java.net.Socket.createImpl(Socket.java:478)
>  at java.net.Socket.connect(Socket.java:605)
>  at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:95)
>  ... 8 more
> {noformat}
>  The retries should have some form of backoff in this situation to avoid 
> flooding the logs and exhausting other resources on the server.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16468) BlobClient rapid retrieval retries on failure opens too many sockets

2020-03-16 Thread Jason Kania (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17060282#comment-17060282
 ] 

Jason Kania commented on FLINK-16468:
-

[~gjy] These were reported on the same day but were the result of distinct 
failures on different days.

If they are both due to the restart delay then I would suggest more detail be 
added to the restart delay documentation text because right now its 
implications are not fully explained. Additionally, a 1 second default restart 
delay is still going to lead to sockets in the TIME-WAIT state. This would 
suggest that a backoff algorithm would be more appropriate than a fixed delay.

> BlobClient rapid retrieval retries on failure opens too many sockets
> 
>
> Key: FLINK-16468
> URL: https://issues.apache.org/jira/browse/FLINK-16468
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.8.3, 1.9.2, 1.10.0
> Environment: Linux ubuntu servers running, patch current latest 
> Ubuntu patch current release java 8 JRE
>Reporter: Jason Kania
>Priority: Major
> Fix For: 1.11.0
>
>
> In situations where the BlobClient retrieval fails as in the following log, 
> rapid retries will exhaust the open sockets. All the retries happen within a 
> few milliseconds.
> {noformat}
> 2020-03-06 17:19:07,116 ERROR org.apache.flink.runtime.blob.BlobClient - 
> Failed to fetch BLOB 
> cddd17ef76291dd60eee9fd36085647a/p-bcd61652baba25d6863cf17843a2ef64f4c801d5-c1781532477cf65ff1c1e7d72dccabc7
>  from aaa-1/10.0.1.1:45145 and store it under 
> /tmp/blobStore-7328ed37-8bc7-4af7-a56c-474e264157c9/incoming/temp-0004 
> Retrying...
> {noformat}
> The above is output repeatedly until the following error occurs:
> {noformat}
> java.io.IOException: Could not connect to BlobServer at address 
> aaa-1/10.0.1.1:45145
>  at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:100)
>  at 
> org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:143)
>  at 
> org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181)
>  at 
> org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:202)
>  at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)
>  at 
> org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:915)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:595)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.net.SocketException: Too many open files
>  at java.net.Socket.createImpl(Socket.java:478)
>  at java.net.Socket.connect(Socket.java:605)
>  at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:95)
>  ... 8 more
> {noformat}
>  The retries should have some form of backoff in this situation to avoid 
> flooding the logs and exhausting other resources on the server.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16468) BlobClient rapid retrieval retries on failure opens too many sockets

2020-03-11 Thread Jason Kania (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17057498#comment-17057498
 ] 

Jason Kania commented on FLINK-16468:
-

[~NicoK], I will try the blob.fetch.retries settings and see.

As for the deployment, it is only 4 slots in one task manager on a 2 CPU system 
so was not expecting to exhaust the number of sockets either. It was the sheer 
number of retries really quickly that seems to have done it so it may have been 
all the closed but TIME-WAIT status socket connections at the OS level that 
were not yet available for reuse that was causing the issue.

If the issue happens again, I will see if more information is available. 
However, a backoff algorithm does seem to be a good plan.

> BlobClient rapid retrieval retries on failure opens too many sockets
> 
>
> Key: FLINK-16468
> URL: https://issues.apache.org/jira/browse/FLINK-16468
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.2
> Environment: Linux ubuntu servers running, patch current latest 
> Ubuntu patch current release java 8 JRE
>Reporter: Jason Kania
>Priority: Major
>
> In situations where the BlobClient retrieval fails as in the following log, 
> rapid retries will exhaust the open sockets. All the retries happen within a 
> few milliseconds.
> {{2020-03-06 17:19:07,116 ERROR org.apache.flink.runtime.blob.BlobClient - 
> Failed to fetch BLOB 
> cddd17ef76291dd60eee9fd36085647a/p-bcd61652baba25d6863cf17843a2ef64f4c801d5-c1781532477cf65ff1c1e7d72dccabc7
>  from aaa-1/10.0.1.1:45145 and store it under 
> /tmp/blobStore-7328ed37-8bc7-4af7-a56c-474e264157c9/incoming/temp-0004 
> Retrying...}}
> The above is output repeatedly until the following error occurs:
> {{java.io.IOException: Could not connect to BlobServer at address 
> aaa-1/10.0.1.1:45145}}
> {{ at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:100)}}
> {{ at 
> org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:143)}}
> {{ at 
> org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181)}}
> {{ at 
> org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:202)}}
> {{ at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)}}
> {{ at 
> org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:915)}}
> {{ at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:595)}}
> {{ at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)}}
> {{ at java.lang.Thread.run(Thread.java:748)}}
> {{Caused by: java.net.SocketException: Too many open files}}
> {{ at java.net.Socket.createImpl(Socket.java:478)}}
> {{ at java.net.Socket.connect(Socket.java:605)}}
> {{ at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:95)}}
> {{ ... 8 more}}
>  The retries should have some form of backoff in this situation to avoid 
> flooding the logs and exhausting other resources on the server.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16470) Network failure causes Checkpoint Coordinator to flood disk with exceptions

2020-03-10 Thread Jason Kania (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17056583#comment-17056583
 ] 

Jason Kania commented on FLINK-16470:
-

[~gjy], the restart strategy was just the default and had no delay so that 
explains the rapid restart. I did not understand that the restart strategy was 
related to this interaction with the Zookeeper and the RegionStrategy. Thanks 
for connecting the two. I will try with a restart strategy and see what happens 
another time if there is a similar failure.

> Network failure causes Checkpoint Coordinator to flood disk with exceptions
> ---
>
> Key: FLINK-16470
> URL: https://issues.apache.org/jira/browse/FLINK-16470
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.9.2
> Environment: Latest patch current Ubuntu release with latest java 8 
> JRE.
>Reporter: Jason Kania
>Priority: Major
>
> When a networking error occurred that prevented access to the shared folder 
> mounted over NFS, the CheckpointCoordinator flooded the logs with the 
> following:
>  
> {{org.apache.flink.util.FlinkException: Could not retrieve checkpoint 158365 
> from state handle under /0158365. This indicates that the 
> retrieved state handle is broken. Try cleaning the state handle store.}}
> {{ at 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.retrieveCompletedCheckpoint(ZooKeeperCompletedCheckpointStore.java:345)}}
> {{ at 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.recover(ZooKeeperCompletedCheckpointStore.java:175)}}
> {{ at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedState(CheckpointCoordinator.java:1014)}}
> {{ at 
> org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG.resetTasks(AdaptedRestartPipelinedRegionStrategyNG.java:205)}}
> {{ at 
> org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG.lambda$createResetAndRescheduleTasksCallback$1(AdaptedRestartPipelinedRegionStrategyNG.java:149)}}
> {{ at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$scheduleWithDelay$3(FutureUtils.java:202)}}
> {{ at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$scheduleWithDelay$4(FutureUtils.java:226)}}
> {{ at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)}}
> {{ at java.util.concurrent.FutureTask.run(FutureTask.java:266)}}
> {{ at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)}}
> {{ at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)}}
> {{ at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)}}
> {{ at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)}}
> {{ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)}}
> {{ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)}}
> {{ at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)}}
> {{ at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)}}
> {{ at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)}}
> {{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)}}
> {{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)}}
> {{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)}}
> {{ at akka.actor.Actor.aroundReceive(Actor.scala:517)}}
> {{ at akka.actor.Actor.aroundReceive$(Actor.scala:515)}}
> {{ at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)}}
> {{ at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)}}
> {{ at akka.actor.ActorCell.invoke(ActorCell.scala:561)}}
> {{ at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)}}
> {{ at akka.dispatch.Mailbox.run(Mailbox.scala:225)}}
> {{ at akka.dispatch.Mailbox.exec(Mailbox.scala:235)}}
> {{ at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)}}
> {{ at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)}}
> {{ at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)}}
> {{ at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)}}
> {{Caused by: java.io.FileNotFoundException: 
> /mnt/shared/completedCheckpoint53ed9d9197f7 (No such file or directory)}}
> {{ at java.io.FileInputStream.open0(Native Method)}}
> {{ at java.io.FileInputStream.open(FileInputStream.java:195)}}
> {{ at java.io.FileInputStream.(FileInputStream.java:138)}}
> {{ at 
> org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)}}
> {{ at 
> 

[jira] [Commented] (FLINK-16468) BlobClient rapid retrieval retries on failure opens too many sockets

2020-03-10 Thread Jason Kania (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17056533#comment-17056533
 ] 

Jason Kania commented on FLINK-16468:
-

[~azagrebin], the only thing I saw was a lot of repeats of the IOException 
before the last one included here and the following SocketException. I did not 
see anything preceding it and the logs were deleted because of the flood and 
excess disk utilization.

The debug logs for the BlobClient are now enabled. I will update this issue if 
the error occurs again.

The blob.fetch.retries was not modified from the default value.

> BlobClient rapid retrieval retries on failure opens too many sockets
> 
>
> Key: FLINK-16468
> URL: https://issues.apache.org/jira/browse/FLINK-16468
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.2
> Environment: Linux ubuntu servers running, patch current latest 
> Ubuntu patch current release java 8 JRE
>Reporter: Jason Kania
>Priority: Major
>
> In situations where the BlobClient retrieval fails as in the following log, 
> rapid retries will exhaust the open sockets. All the retries happen within a 
> few milliseconds.
> {{2020-03-06 17:19:07,116 ERROR org.apache.flink.runtime.blob.BlobClient - 
> Failed to fetch BLOB 
> cddd17ef76291dd60eee9fd36085647a/p-bcd61652baba25d6863cf17843a2ef64f4c801d5-c1781532477cf65ff1c1e7d72dccabc7
>  from aaa-1/10.0.1.1:45145 and store it under 
> /tmp/blobStore-7328ed37-8bc7-4af7-a56c-474e264157c9/incoming/temp-0004 
> Retrying...}}
> The above is output repeatedly until the following error occurs:
> {{java.io.IOException: Could not connect to BlobServer at address 
> aaa-1/10.0.1.1:45145}}
> {{ at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:100)}}
> {{ at 
> org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:143)}}
> {{ at 
> org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181)}}
> {{ at 
> org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:202)}}
> {{ at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)}}
> {{ at 
> org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:915)}}
> {{ at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:595)}}
> {{ at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)}}
> {{ at java.lang.Thread.run(Thread.java:748)}}
> {{Caused by: java.net.SocketException: Too many open files}}
> {{ at java.net.Socket.createImpl(Socket.java:478)}}
> {{ at java.net.Socket.connect(Socket.java:605)}}
> {{ at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:95)}}
> {{ ... 8 more}}
>  The retries should have some form of backoff in this situation to avoid 
> flooding the logs and exhausting other resources on the server.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16475) PulsarConsumerSource messageReceiveTimeoutMs parameter should be configurable

2020-03-06 Thread Jason Kania (Jira)
Jason Kania created FLINK-16475:
---

 Summary: PulsarConsumerSource messageReceiveTimeoutMs parameter 
should be configurable
 Key: FLINK-16475
 URL: https://issues.apache.org/jira/browse/FLINK-16475
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.9.2
 Environment: Not releveant
Reporter: Jason Kania


The messageReceiveTimeoutMs value in the PulsarConsumerSource class is 
hardcoded to 100ms but should be configurable to accommodate different hardware 
setups such as developer labs where the performance level may not be as 
critical.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16470) Network failure causes Checkpoint Coordinator to flood disk with exceptions

2020-03-06 Thread Jason Kania (Jira)
Jason Kania created FLINK-16470:
---

 Summary: Network failure causes Checkpoint Coordinator to flood 
disk with exceptions
 Key: FLINK-16470
 URL: https://issues.apache.org/jira/browse/FLINK-16470
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.9.2
 Environment: Latest patch current Ubuntu release with latest java 8 
JRE.
Reporter: Jason Kania


When a networking error occurred that prevented access to the shared folder 
mounted over NFS, the CheckpointCoordinator flooded the logs with the following:

 

{{org.apache.flink.util.FlinkException: Could not retrieve checkpoint 158365 
from state handle under /0158365. This indicates that the retrieved 
state handle is broken. Try cleaning the state handle store.}}
{{ at 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.retrieveCompletedCheckpoint(ZooKeeperCompletedCheckpointStore.java:345)}}
{{ at 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.recover(ZooKeeperCompletedCheckpointStore.java:175)}}
{{ at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedState(CheckpointCoordinator.java:1014)}}
{{ at 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG.resetTasks(AdaptedRestartPipelinedRegionStrategyNG.java:205)}}
{{ at 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG.lambda$createResetAndRescheduleTasksCallback$1(AdaptedRestartPipelinedRegionStrategyNG.java:149)}}
{{ at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$scheduleWithDelay$3(FutureUtils.java:202)}}
{{ at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$scheduleWithDelay$4(FutureUtils.java:226)}}
{{ at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)}}
{{ at java.util.concurrent.FutureTask.run(FutureTask.java:266)}}
{{ at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)}}
{{ at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)}}
{{ at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)}}
{{ at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)}}
{{ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)}}
{{ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)}}
{{ at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)}}
{{ at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)}}
{{ at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)}}
{{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)}}
{{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)}}
{{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)}}
{{ at akka.actor.Actor.aroundReceive(Actor.scala:517)}}
{{ at akka.actor.Actor.aroundReceive$(Actor.scala:515)}}
{{ at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)}}
{{ at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)}}
{{ at akka.actor.ActorCell.invoke(ActorCell.scala:561)}}
{{ at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)}}
{{ at akka.dispatch.Mailbox.run(Mailbox.scala:225)}}
{{ at akka.dispatch.Mailbox.exec(Mailbox.scala:235)}}
{{ at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)}}
{{ at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)}}
{{ at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)}}
{{ at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)}}
{{Caused by: java.io.FileNotFoundException: 
/mnt/shared/completedCheckpoint53ed9d9197f7 (No such file or directory)}}
{{ at java.io.FileInputStream.open0(Native Method)}}
{{ at java.io.FileInputStream.open(FileInputStream.java:195)}}
{{ at java.io.FileInputStream.(FileInputStream.java:138)}}
{{ at 
org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)}}
{{ at 
org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)}}
{{ at 
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)}}
{{ at 
org.apache.flink.runtime.state.RetrievableStreamStateHandle.openInputStream(RetrievableStreamStateHandle.java:64)}}
{{ at 
org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:57)}}
{{ at 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.retrieveCompletedCheckpoint(ZooKeeperCompletedCheckpointStore.java:339)}}
{{ ... 32 more}}

 

The result was very high CPU utilization, high disk IO and a cascade of other 
application failures. In this situation, there should be a backoff on the 
retries to not bring down the whole node. Recovery 

[jira] [Updated] (FLINK-15744) Some TaskManager Task exceptions are logged as info

2020-03-06 Thread Jason Kania (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15744?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Kania updated FLINK-15744:

Summary: Some TaskManager Task exceptions are logged as info  (was: Some 
TaskManager Task exception are logged as info)

> Some TaskManager Task exceptions are logged as info
> ---
>
> Key: FLINK-15744
> URL: https://issues.apache.org/jira/browse/FLINK-15744
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration
>Affects Versions: 1.9.1
> Environment: 1.9.1 on a recent ubuntu release
>Reporter: Jason Kania
>Assignee: Jason Kania
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> After investigating to find the cause for a submitted job alternating between 
> the deploying and running states, I tried setting the logging to info level 
> and realized that some deployment exceptions were being logged as info when 
> they really should be a warning or error level.
> In particular, the following line (at line 960) of the 1.9.1 branch of the 
> Task class logs as info:
> LOG.info("{} ({}) switched from {} to {}.", taskNameWithSubtask, executionId, 
> currentState, newState, cause);
> This log line always has a non null exception cause so it would make sense to 
> log at warning or error level for this state transition so it is more easily 
> found.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16468) BlobClient rapid retrieval retries on failure opens too many sockets

2020-03-06 Thread Jason Kania (Jira)
Jason Kania created FLINK-16468:
---

 Summary: BlobClient rapid retrieval retries on failure opens too 
many sockets
 Key: FLINK-16468
 URL: https://issues.apache.org/jira/browse/FLINK-16468
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.9.2
 Environment: Linux ubuntu servers running, patch current latest Ubuntu 
patch current release java 8 JRE
Reporter: Jason Kania


In situations where the BlobClient retrieval fails as in the following log, 
rapid retries will exhaust the open sockets. All the retries happen within a 
few milliseconds.

{{2020-03-06 17:19:07,116 ERROR org.apache.flink.runtime.blob.BlobClient - 
Failed to fetch BLOB 
cddd17ef76291dd60eee9fd36085647a/p-bcd61652baba25d6863cf17843a2ef64f4c801d5-c1781532477cf65ff1c1e7d72dccabc7
 from aaa-1/10.0.1.1:45145 and store it under 
/tmp/blobStore-7328ed37-8bc7-4af7-a56c-474e264157c9/incoming/temp-0004 
Retrying...}}

The above is output repeatedly until the following error occurs:

{{java.io.IOException: Could not connect to BlobServer at address 
aaa-1/10.0.1.1:45145}}
{{ at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:100)}}
{{ at 
org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:143)}}
{{ at 
org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181)}}
{{ at 
org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:202)}}
{{ at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)}}
{{ at 
org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:915)}}
{{ at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:595)}}
{{ at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)}}
{{ at java.lang.Thread.run(Thread.java:748)}}
{{Caused by: java.net.SocketException: Too many open files}}
{{ at java.net.Socket.createImpl(Socket.java:478)}}
{{ at java.net.Socket.connect(Socket.java:605)}}
{{ at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:95)}}
{{ ... 8 more}}

 The retries should have some form of backoff in this situation to avoid 
flooding the logs and exhausting other resources on the server.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15744) Some TaskManager Task exception are logged as info

2020-01-23 Thread Jason Kania (Jira)
Jason Kania created FLINK-15744:
---

 Summary: Some TaskManager Task exception are logged as info
 Key: FLINK-15744
 URL: https://issues.apache.org/jira/browse/FLINK-15744
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Configuration
Affects Versions: 1.9.1
 Environment: 1.9.1 on a recent ubuntu release
Reporter: Jason Kania


After investigating to find the cause for a submitted job alternating between 
the deploying and running states, I tried setting the logging to info level and 
realized that some deployment exceptions were being logged as info when they 
really should be a warning or error level.

In particular, the following line (at line 960) of the 1.9.1 branch of the Task 
class logs as info:

LOG.info("{} ({}) switched from {} to {}.", taskNameWithSubtask, executionId, 
currentState, newState, cause);

This log line always has a non null exception cause so it would make sense to 
log at warning or error level for this state transition so it is more easily 
found.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11799) KryoSerializer/OperatorChain ignores copy failure resulting in NullPointerException

2019-04-10 Thread Jason Kania (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814589#comment-16814589
 ] 

Jason Kania commented on FLINK-11799:
-

Hi Liya,

 

Your approach seems reasonable.

Thanks,

Jason

> KryoSerializer/OperatorChain ignores copy failure resulting in 
> NullPointerException
> ---
>
> Key: FLINK-11799
> URL: https://issues.apache.org/jira/browse/FLINK-11799
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.2
>Reporter: Jason Kania
>Priority: Major
>
> I was encountering a problem with NullPointerExceptions with the deserialized 
> object reaching my ProcessFunction process() method implementation as a null 
> value. Upon investigation, I discovered two issues with the implementation of 
> the KryoSerializer copy().
> 1) The 'public T copy(T from)' method swallows the error if the kryo copy() 
> call generates an exception. The code should report the copy error at least 
> once as a warning to be aware that the kryo copy() is failing. I understand 
> that the code is there to handle the lack of a copy implementation but due to 
> the potential inefficiency of having to write and read the object instead of 
> copying it, this would seem useful information to share at the least. It is 
> also important to have a warning in case the cause of the copy error is 
> something that needs to be fixed.
> 2) The call to 'kryo.readObject(input, from.getClass())' does not handle the 
> fact that the kryo readObject(Input input, Class aClass) method may return a 
> null value if there are any issues. This could be handled with a check or 
> warning in the OperatorChain.CopyingChainingOutput.pushToOperator() method 
> but is also ignored there, allowing a null value to be passed along without 
> providing any reason for the null value in logging.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11799) KryoSerializer/OperatorChain ignores copy failure resulting in NullPointerException

2019-03-03 Thread Jason Kania (JIRA)
Jason Kania created FLINK-11799:
---

 Summary: KryoSerializer/OperatorChain ignores copy failure 
resulting in NullPointerException
 Key: FLINK-11799
 URL: https://issues.apache.org/jira/browse/FLINK-11799
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.7.2
Reporter: Jason Kania


I was encountering a problem with NullPointerExceptions with the deserialized 
object reaching my ProcessFunction process() method implementation as a null 
value. Upon investigation, I discovered two issues with the implementation of 
the KryoSerializer copy().

1) The 'public T copy(T from)' method swallows the error if the kryo copy() 
call generates an exception. The code should report the copy error at least 
once as a warning to be aware that the kryo copy() is failing. I understand 
that the code is there to handle the lack of a copy implementation but due to 
the potential inefficiency of having to write and read the object instead of 
copying it, this would seem useful information to share at the least. It is 
also important to have a warning in case the cause of the copy error is 
something that needs to be fixed.

2) The call to 'kryo.readObject(input, from.getClass())' does not handle the 
fact that the kryo readObject(Input input, Class aClass) method may return a 
null value if there are any issues. This could be handled with a check or 
warning in the OperatorChain.CopyingChainingOutput.pushToOperator() method but 
is also ignored there, allowing a null value to be passed along without 
providing any reason for the null value in logging.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8035) Unable to submit job when HA is enabled

2018-09-20 Thread Jason Kania (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16622792#comment-16622792
 ] 

Jason Kania commented on FLINK-8035:


I would need to know the specific logs you would like to see and the log 
configuration. The problem that I saw was that there were no logs indicating 
any issue even with trace enabled. I would simply get a message at the point of 
timeout without any preceding logs indicating that any messages had been set.

> Unable to submit job when HA is enabled
> ---
>
> Key: FLINK-8035
> URL: https://issues.apache.org/jira/browse/FLINK-8035
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.0
> Environment: Mac OS X
>Reporter: Robert Metzger
>Priority: Critical
>
> Steps to reproduce:
> - Get Flink 1.4 (f5a0b4bdfb)
> - Get ZK (3.3.6 in this case)
> - Put the following flink-conf.yaml:
> {code}
> high-availability: zookeeper
> high-availability.storageDir: file:///tmp/flink-ha
> high-availability.zookeeper.quorum: localhost:2181
> high-availability.zookeeper.path.cluster-id: /my-namespace
> {code}
> - Start Flink, submit a job (any streaming example will do)
> The job submission will time out. On the JobManager, it seems that the job 
> submission gets stuck when trying to submit something to Zookeeper.
> In the JM UI, the job will sit there in status "CREATED"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8035) Unable to submit job when HA is enabled

2018-09-14 Thread Jason Kania (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16615275#comment-16615275
 ] 

Jason Kania commented on FLINK-8035:


I encountered this issue in 1.5.3 and subsequently had to roll back to 1.4.2. I 
had cleaned out the zookeeper data but the issue remained. I was unable to 
trace to a known cause. I suspect a swallowed error in zookeeper communication 
because the code to perform the actual low level send for the job list was not 
executed or at least the breakpoint was never hit.

> Unable to submit job when HA is enabled
> ---
>
> Key: FLINK-8035
> URL: https://issues.apache.org/jira/browse/FLINK-8035
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.0
> Environment: Mac OS X
>Reporter: Robert Metzger
>Priority: Critical
>
> Steps to reproduce:
> - Get Flink 1.4 (f5a0b4bdfb)
> - Get ZK (3.3.6 in this case)
> - Put the following flink-conf.yaml:
> {code}
> high-availability: zookeeper
> high-availability.storageDir: file:///tmp/flink-ha
> high-availability.zookeeper.quorum: localhost:2181
> high-availability.zookeeper.path.cluster-id: /my-namespace
> {code}
> - Start Flink, submit a job (any streaming example will do)
> The job submission will time out. On the JobManager, it seems that the job 
> submission gets stuck when trying to submit something to Zookeeper.
> In the JM UI, the job will sit there in status "CREATED"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8035) Unable to submit job when HA is enabled

2018-09-05 Thread Jason Kania (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605052#comment-16605052
 ] 

Jason Kania commented on FLINK-8035:


This issue affects more than just submission. Many of the flink command line 
calls also timeout because of this issue. In my case, I am unable to upgrade 
the zookeeper because of other components so I have had to abandon HA mode.

> Unable to submit job when HA is enabled
> ---
>
> Key: FLINK-8035
> URL: https://issues.apache.org/jira/browse/FLINK-8035
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.0
> Environment: Mac OS X
>Reporter: Robert Metzger
>Priority: Critical
>
> Steps to reproduce:
> - Get Flink 1.4 (f5a0b4bdfb)
> - Get ZK (3.3.6 in this case)
> - Put the following flink-conf.yaml:
> {code}
> high-availability: zookeeper
> high-availability.storageDir: file:///tmp/flink-ha
> high-availability.zookeeper.quorum: localhost:2181
> high-availability.zookeeper.path.cluster-id: /my-namespace
> {code}
> - Start Flink, submit a job (any streaming example will do)
> The job submission will time out. On the JobManager, it seems that the job 
> submission gets stuck when trying to submit something to Zookeeper.
> In the JM UI, the job will sit there in status "CREATED"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9371) High Availability JobManager Registration Failure

2018-05-15 Thread Jason Kania (JIRA)
Jason Kania created FLINK-9371:
--

 Summary: High Availability JobManager Registration Failure
 Key: FLINK-9371
 URL: https://issues.apache.org/jira/browse/FLINK-9371
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.4.2
Reporter: Jason Kania


The following error is happening intermittently on an 3 node cluster with 2 Job 
Managers configured in HA mode. When this happens, the two JobManager instances 
are associated with one another.

2018-05-15 19:00:06,400 INFO  
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager  
- Trying to associate with JobManager leader 
akka.tcp://flink@aaa-1:5/user/jobmanager
2018-05-15 19:00:06,404 WARN  org.apache.flink.runtime.jobmanager.JobManager
    - Discard message 
LeaderSessionMessage(0bbe70c4-2642-4a08-912f-6cc09646281f,RegisterResourceManager
 akka://flink/user/resourcemanager-d6567c5d-85f4-4b18-8eac-cf9725d076a5) 
because there is currently no valid leader id known.
2018-05-15 19:00:16,418 ERROR 
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager  
- Resource manager could not register at JobManager
akka.pattern.AskTimeoutException: Ask timed out on 
[ActorSelection[Anchor(akka://flink/), Path(/user/jobmanager)]] after [1 
ms]. Sender[null] sent message of type 
"org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage".
    at 
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
    at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
    at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
    at 
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
    at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
    at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
    at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
    at 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
    at 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
    at java.lang.Thread.run(Thread.java:748)

 

Sometimes the following type of log also comes out following the previous log:

2018-05-15 19:13:47,525 WARN  
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager  
- Discard message 
LeaderSessionMessage(5cab29b9-10d3-4b25-b934-f06b82be15b5,TriggerRegistrationAtJobManager
 akka.tcp://flink@aaa-1:5/user/jobmanager) because the expected leader 
session ID 61075587-51da-4e58-ac4f-9ea118ccdde9 did not equal the received 
leader session ID 5cab29b9-10d3-4b25-b934-f06b82be15b5.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)