[jira] [Created] (FLINK-33486) Pulsar Client Send Timeout Terminates TaskManager
Jason Kania created FLINK-33486: --- Summary: Pulsar Client Send Timeout Terminates TaskManager Key: FLINK-33486 URL: https://issues.apache.org/jira/browse/FLINK-33486 Project: Flink Issue Type: Bug Components: Connectors / Pulsar Affects Versions: 1.17.1 Reporter: Jason Kania Currently, when the Pulsar Producer encounters a timeout when attempting to send data, it generates an unhandled TimeoutException. This is not a reasonable way to handle the timeout. The situation should be handled in a graceful way either through additional parameters that put control of the action under the discretion of the user or through some callback mechanism that the user can work with to write code. Unfortunately, fight now, this causes a termination of the task manager which then leads to other issues. Increasing the timeout period to avoid the issue is not really an option to ensure proper handling in the event that the situation does occur. The exception is as follows: org.apache.flink.util.FlinkRuntimeException: Failed to send data to Pulsar: persistent://public/default/myproducer-partition-0 at org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.throwSendingException(PulsarWriter.java:182) ~[flink-connector-pulsar-4.0.0-1.17.jar:4.0.0-1.17] at org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.lambda$write$0(PulsarWriter.java:172) ~[flink-connector-pulsar-4.0.0-1.17.jar:4.0.0-1.17] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) [flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) [flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [flink-dist-1.17.1.jar:1.17.1] at java.lang.Thread.run(Thread.java:829) [?:?] Caused by: org.apache.pulsar.client.api.PulsarClientException$TimeoutException: The producer myproducer- f4b1580b-1ea8-4c21-9d0b-da4d12ca6f93 can not send message to the topic persistent://public/default/myproducer-partition-0 within given timeout at org.apache.pulsar.client.impl.ProducerImpl.run(ProducerImpl.java:1993) ~[pulsar-client-all-2.11.2.jar:2.11.2] at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.run(HashedWheelTimer.java:715) ~[pulsar-client-all-2.11.2.jar:2.11.2] at org.apache.pulsar.shade.io.netty.util.concurrent.ImmediateExecutor.execute(ImmediateExecutor.java:34) ~[pulsar-client-all-2.11.2.jar:2.11.2] at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:703) ~[pulsar-client-all-2.11.2.jar:2.11.2] at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:790) ~[pulsar-client-all-2.11.2.jar:2.11.2] at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:503) ~[pulsar-client-all-2.11.2.jar:2.11.2] at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[pulsar-client-all-2.11.2.jar:2.11.2] ... 1 more -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33135) Flink Pulsar Connector Attempts Partitioned Routing on Unpartitioned Topic
[ https://issues.apache.org/jira/browse/FLINK-33135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Kania updated FLINK-33135: Description: For a non-partitioned topic, the Flink Pulsar connector creates a TopicRouter, ensures that the list of partitions is empty during deployment and then complains when the list of partitions supplied to it is empty at runtime. The default TopicRouter that is created is the RoundRobinTopicRouter and it provides a nonsensical error for this type of TopicRouter. This error message issue is raised in ticket https://issues.apache.org/jira/browse/FLINK-33136. The connector should not be applying a topic router to nonpartitioned topics or should treat the nonpartitioned topic as a special/different case. Currently, the following error is raised even though the setTopics() method is called on the PulsarSink.builder() with a single topic. Caused by: java.lang.IllegalArgumentException: You should provide topics for routing topic by message key hash. at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) at org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56) at org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ... 30 more The distinctTopics() method of the TopicNameUtils class is what ensures the list of partitions is empty for a nonpartitioned topic. was: For a non-partitioned topic, the Flink Pulsar connector creates a TopicRouter, ensures that the list of partitions is empty during deployment and then complains when the list of partitions supplied to it is empty at runtime. The default TopicRouter that is created is the RoundRobinTopicRouter and it provides a nonsensical error for this type of TopicRouter. This error message issue is raised in ticket https://issues.apache.org/jira/browse/FLINK-33136. The connector should not be applying a topic router to nonpartitioned topics or should treat the nonpartitioned topic as a special/different case. Currently, the following error is raised even though the setTopics() method is called on the PulsarSink.builder() with a single topic. Caused by: java.lang.IllegalArgumentException: You should provide topics for routing topic by message key hash. at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) at org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56) at org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ... 30 more The distinctTopics() method of the TopicNameUtils class is what ensures the list of partitions is empty. > Flink Pulsar Connector Attempts Partitioned Routing on Unpartitioned Topic > -- > > Key: FLINK-33135 > URL: https://issues.apache.org/jira/browse/FLINK-33135 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.17.1 >Reporter: Jason Kania >Priority: Major > > For a non-partitioned topic, the Flink Pulsar connector creates a > TopicRouter, ensures that the list of partitions is empty during deployment > and then complains when the list of partitions supplied to it is empty at > runtime. The default TopicRouter that is created is the RoundRobinTopicRouter > and it provides a nonsensical error for this type of TopicRouter. This error > message issue is raised in ticket > https://issues.apache.org/jira/browse/FLINK-33136. > The connector should not be applying a topic router to nonpartitioned topics > or should treat the nonpartitioned topic as a special/different case. > Currently, the following error is raised even though the setTopics() method > is called on the PulsarSink.builder() with a single topic. > Caused by: java.lang.IllegalArgumentException: You should provide topics for > routing topic by message key hash. > at > org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) > at > org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56) > at >
[jira] [Updated] (FLINK-33135) Flink Pulsar Connector Attempts Partitioned Routing on Unpartitioned Topic
[ https://issues.apache.org/jira/browse/FLINK-33135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Kania updated FLINK-33135: Description: For a non-partitioned topic, the Flink Pulsar connector creates a TopicRouter, ensures that the list of partitions is empty during deployment and then complains when the list of partitions supplied to it is empty at runtime. The default TopicRouter that is created is the RoundRobinTopicRouter and it provides a nonsensical error for this type of TopicRouter. This error message issue is raised in ticket https://issues.apache.org/jira/browse/FLINK-33136. The connector should not be applying a topic router to nonpartitioned topics or should treat the nonpartitioned topic as a special/different case. Currently, the following error is raised even though the setTopics() method is called on the PulsarSink.builder() with a single topic. Caused by: java.lang.IllegalArgumentException: You should provide topics for routing topic by message key hash. at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) at org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56) at org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ... 30 more The distinctTopics() method of the TopicNameUtils class is what ensures the list of partitions is empty. was: For a non-partitioned topic, the Flink Pulsar connector creates a TopicRouter, ensures that the list of partitions is empty during deployment and then complains when the list of partitions supplied to it is empty at runtime. The default TopicRouter that is created is the RoundRobinTopicRouter and it provides a nonsensical error for this type of TopicRouter. This error message issue is raised in ticket https://issues.apache.org/jira/browse/FLINK-33136. The connector should not be applying a topic router to nonpartitioned topics or should treat the nonpartitioned topic as a special case. Currently, the following error is raised even though the setTopics() method is called on the PulsarSink.builder() with a single topic. Caused by: java.lang.IllegalArgumentException: You should provide topics for routing topic by message key hash. at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) at org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56) at org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ... 30 more The distinctTopics() method of the TopicNameUtils class is what ensures the list of partitions is empty. > Flink Pulsar Connector Attempts Partitioned Routing on Unpartitioned Topic > -- > > Key: FLINK-33135 > URL: https://issues.apache.org/jira/browse/FLINK-33135 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.17.1 >Reporter: Jason Kania >Priority: Major > > For a non-partitioned topic, the Flink Pulsar connector creates a > TopicRouter, ensures that the list of partitions is empty during deployment > and then complains when the list of partitions supplied to it is empty at > runtime. The default TopicRouter that is created is the RoundRobinTopicRouter > and it provides a nonsensical error for this type of TopicRouter. This error > message issue is raised in ticket > https://issues.apache.org/jira/browse/FLINK-33136. > The connector should not be applying a topic router to nonpartitioned topics > or should treat the nonpartitioned topic as a special/different case. > Currently, the following error is raised even though the setTopics() method > is called on the PulsarSink.builder() with a single topic. > Caused by: java.lang.IllegalArgumentException: You should provide topics for > routing topic by message key hash. > at > org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) > at > org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56) > at > org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147) > at >
[jira] [Updated] (FLINK-33135) Flink Pulsar Connector Attempts Partitioned Routing on Unpartitioned Topic
[ https://issues.apache.org/jira/browse/FLINK-33135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Kania updated FLINK-33135: Description: For a non-partitioned topic, the Flink Pulsar connector creates a TopicRouter, ensures that the list of partitions is empty during deployment and then complains when the list of partitions supplied to it is empty at runtime. The default TopicRouter that is created is the RoundRobinTopicRouter and it provides a nonsensical error for this type of TopicRouter. This error message issue is raised in ticket https://issues.apache.org/jira/browse/FLINK-33136. The connector should not be applying a topic router to nonpartitioned topics or should treat the nonpartitioned topic as a special case. Currently, the following error is raised even though the setTopics() method is called on the PulsarSink.builder() with a single topic. Caused by: java.lang.IllegalArgumentException: You should provide topics for routing topic by message key hash. at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) at org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56) at org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ... 30 more The distinctTopics() method of the TopicNameUtils class is what ensures the list of partitions is empty. was: For a non-partitioned topic, the Flink Pulsar connector creates a TopicRouter, ensures that the list of partitions is empty during deployment and then complains when the list of partitions supplied to it is empty at runtime. The default TopicRouter that is created is the RoundRobinTopicRouter and it provides a nonsensical error for this type of TopicRouter. This error message issue is raised in ticket https://issues.apache.org/jira/browse/FLINK-33136. The connector should not be applying a topic router to nonpartitioned topics or should treat the nonpartitioned topic as a special case. Currently, the following error is raised even though the setTopics() method is called on the PulsarSink.builder() with a single topic. Caused by: java.lang.IllegalArgumentException: You should provide topics for routing topic by message key hash. at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) at org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56) at org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ... 30 more > Flink Pulsar Connector Attempts Partitioned Routing on Unpartitioned Topic > -- > > Key: FLINK-33135 > URL: https://issues.apache.org/jira/browse/FLINK-33135 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.17.1 >Reporter: Jason Kania >Priority: Major > > For a non-partitioned topic, the Flink Pulsar connector creates a > TopicRouter, ensures that the list of partitions is empty during deployment > and then complains when the list of partitions supplied to it is empty at > runtime. The default TopicRouter that is created is the RoundRobinTopicRouter > and it provides a nonsensical error for this type of TopicRouter. This error > message issue is raised in ticket > https://issues.apache.org/jira/browse/FLINK-33136. > The connector should not be applying a topic router to nonpartitioned topics > or should treat the nonpartitioned topic as a special case. Currently, the > following error is raised even though the setTopics() method is called on the > PulsarSink.builder() with a single topic. > Caused by: java.lang.IllegalArgumentException: You should provide topics for > routing topic by message key hash. > at > org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) > at > org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56) > at > org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147) > at > org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) > at >
[jira] [Updated] (FLINK-33135) Flink Pulsar Connector Attempts Partitioned Routing on Unpartitioned Topic
[ https://issues.apache.org/jira/browse/FLINK-33135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Kania updated FLINK-33135: Description: For a non-partitioned topic, the Flink Pulsar connector creates a TopicRouter, ensures that the list of partitions is empty during deployment and then complains when the list of partitions supplied to it is empty at runtime. The default TopicRouter that is created is the RoundRobinTopicRouter and it provides a nonsensical error for this type of TopicRouter. This error message issue is raised in ticket https://issues.apache.org/jira/browse/FLINK-33136. The connector should not be applying a topic router to nonpartitioned topics or should treat the nonpartitioned topic as a special case. Currently, the following error is raised even though the setTopics() method is called on the PulsarSink.builder() with a single topic. Caused by: java.lang.IllegalArgumentException: You should provide topics for routing topic by message key hash. at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) at org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56) at org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ... 30 more was: For a non-partitioned topic, the Flink Pulsar connector creates a TopicRouter, ensures that the list of partitions is empty during deployment and then complains when the list of partitions supplied to it is empty at runtime. The default TopicRouter that is created is the RoundRobinTopicRouter and it provides a nonsensical error for this type of TopicRouter. This error message issue is raised in ticket ???. The connector should not be applying a topic router to nonpartitioned topics or should treat the nonpartitioned topic as a special case. Currently, the following error is raised even though the setTopics() method is called on the PulsarSink.builder() with a single topic. Caused by: java.lang.IllegalArgumentException: You should provide topics for routing topic by message key hash. at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) at org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56) at org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ... 30 more > Flink Pulsar Connector Attempts Partitioned Routing on Unpartitioned Topic > -- > > Key: FLINK-33135 > URL: https://issues.apache.org/jira/browse/FLINK-33135 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.17.1 >Reporter: Jason Kania >Priority: Major > > For a non-partitioned topic, the Flink Pulsar connector creates a > TopicRouter, ensures that the list of partitions is empty during deployment > and then complains when the list of partitions supplied to it is empty at > runtime. The default TopicRouter that is created is the RoundRobinTopicRouter > and it provides a nonsensical error for this type of TopicRouter. This error > message issue is raised in ticket > https://issues.apache.org/jira/browse/FLINK-33136. > The connector should not be applying a topic router to nonpartitioned topics > or should treat the nonpartitioned topic as a special case. Currently, the > following error is raised even though the setTopics() method is called on the > PulsarSink.builder() with a single topic. > Caused by: java.lang.IllegalArgumentException: You should provide topics for > routing topic by message key hash. > at > org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) > at > org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56) > at > org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147) > at > org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) > ... 30 more > > -- This
[jira] [Created] (FLINK-33136) Flink Pulsar Connector RoundRobinTopicRouter Generates Invalid Error Message
Jason Kania created FLINK-33136: --- Summary: Flink Pulsar Connector RoundRobinTopicRouter Generates Invalid Error Message Key: FLINK-33136 URL: https://issues.apache.org/jira/browse/FLINK-33136 Project: Flink Issue Type: Bug Components: Connectors / Pulsar Affects Versions: 1.17.1 Reporter: Jason Kania The RoundRobinTopicRouter class generates the runtime error "You should provide topics for routing topic by message key hash." when no partitions are set. This error is a direct copy of the error in the KeyHashTopicRouter but is nonsensical to a RoundRobinTopicRouter since hashing is not involved in route selection. More importantly however, this error should be detected at deploy time when the PulsarSink is built with the builder since the list of topics is supplied via the setTopics() method of the builder. Additionally, the wording of the error is not clear in any case and could be improved to something like: "No partition routing topics were provided to allow for topic routing" -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33135) Flink Pulsar Connector Attempts Partitioned Routing on Unpartitioned Topic
Jason Kania created FLINK-33135: --- Summary: Flink Pulsar Connector Attempts Partitioned Routing on Unpartitioned Topic Key: FLINK-33135 URL: https://issues.apache.org/jira/browse/FLINK-33135 Project: Flink Issue Type: Bug Components: Connectors / Pulsar Affects Versions: 1.17.1 Reporter: Jason Kania For a non-partitioned topic, the Flink Pulsar connector creates a TopicRouter, ensures that the list of partitions is empty during deployment and then complains when the list of partitions supplied to it is empty at runtime. The default TopicRouter that is created is the RoundRobinTopicRouter and it provides a nonsensical error for this type of TopicRouter. This error message issue is raised in ticket ???. The connector should not be applying a topic router to nonpartitioned topics or should treat the nonpartitioned topic as a special case. Currently, the following error is raised even though the setTopics() method is called on the PulsarSink.builder() with a single topic. Caused by: java.lang.IllegalArgumentException: You should provide topics for routing topic by message key hash. at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) at org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56) at org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ... 30 more -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33111) Flink Pulsar Connector to Pulsar Client Version Mismatch
[ https://issues.apache.org/jira/browse/FLINK-33111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Kania updated FLINK-33111: Description: In the documentation for the Flink Pulsar Connector, ([https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/pulsar/]) it indicates that 2.10.0 and above versions of the pulsar client are supported "You can use the connector with the Pulsar 2.10.0 or higher" and the pom file entry references the 4.0.0-1.17 version of the connector which points to the 2.11.0 version of the Pulsar client. However, when using Pulsar Client 2.10.4 or 2.10.5, the following error is generated: java.lang.NoSuchMethodError: 'org.apache.pulsar.client.api.ClientBuilder org.apache.pulsar.client.api.ClientBuilder.connectionMaxIdleSeconds(int)' at org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient(PulsarClientFactory.java:127) at org.apache.flink.connector.pulsar.source.reader.PulsarSourceReader.create(PulsarSourceReader.java:266) at org.apache.flink.connector.pulsar.source.PulsarSource.createReader(PulsarSource.java:137) at org.apache.flink.streaming.api.operators.SourceOperator.initReader(SourceOperator.java:312) at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init(SourceOperatorStreamTask.java:93) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:699) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.base/java.lang.Thread.run(Thread.java:829) The referenced method 'connectionMaxIdleSeconds' is only available in the Pulsar 2.11 client when looking at the source code. I am not sure whether the documentation is wrong and the Flink Pulsar Connector 2.11 is the intended Pulsar version. However, my understanding is that Pulsar 2.11 is targeted toward java 17. This would create the need for mixed Java 11 and Java 17 deployment unless the Pulsar client code is compiled for 2.11. Documentation cleanup and a reference to the appropriate Java versions is needed. A fix to the 1.17.1 Flink pulsar connector may alternatively be required. was: In the documentation for the Flink Pulsar Connector, (https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/pulsar/) it indicates that 2.10.0 and above versions of the pulsar client are supported "You can use the connector with the Pulsar 2.10.0 or higher" and the pom file entry references the 4.0.0-1.17 version of the connector which points to the 2.11.0 version of the Pulsar client. However, when using Pulsar Client 2.10.4 or 2.10.5, the following error is generated: java.lang.NoSuchMethodError: 'org.apache.pulsar.client.api.ClientBuilder org.apache.pulsar.client.api.ClientBuilder.connectionMaxIdleSeconds(int)' at org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient(PulsarClientFactory.java:127) at org.apache.flink.connector.pulsar.source.reader.PulsarSourceReader.create(PulsarSourceReader.java:266) at org.apache.flink.connector.pulsar.source.PulsarSource.createReader(PulsarSource.java:137) at org.apache.flink.streaming.api.operators.SourceOperator.initReader(SourceOperator.java:312) at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init(SourceOperatorStreamTask.java:93) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:699) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.base/java.lang.Thread.run(Thread.java:829) The referenced method 'connectionMaxIdleSeconds' is only available in the Pulsar 2.11 client when looking at the source code. I am not sure whether the documentation is wrong and the Flink Pulsar Connector 2.11 is the intended Pulsar version. However, my understanding is that Pulsar 2.11 is targeted toward java 17. In the maven repository and would create the need for mixed Java 11 and Java 17 deployment. Documentation cleanup and a reference to the appropriate Java versions is needed. A fix to the 1.17.1 Flink pulsar connector may alternatively be required. > Flink Pulsar Connector to Pulsar Client Version Mismatch >
[jira] [Created] (FLINK-33111) Flink Pulsar Connector to Pulsar Client Version Mismatch
Jason Kania created FLINK-33111: --- Summary: Flink Pulsar Connector to Pulsar Client Version Mismatch Key: FLINK-33111 URL: https://issues.apache.org/jira/browse/FLINK-33111 Project: Flink Issue Type: Bug Components: Connectors / Pulsar Affects Versions: 1.17.1 Reporter: Jason Kania In the documentation for the Flink Pulsar Connector, (https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/pulsar/) it indicates that 2.10.0 and above versions of the pulsar client are supported "You can use the connector with the Pulsar 2.10.0 or higher" and the pom file entry references the 4.0.0-1.17 version of the connector which points to the 2.11.0 version of the Pulsar client. However, when using Pulsar Client 2.10.4 or 2.10.5, the following error is generated: java.lang.NoSuchMethodError: 'org.apache.pulsar.client.api.ClientBuilder org.apache.pulsar.client.api.ClientBuilder.connectionMaxIdleSeconds(int)' at org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient(PulsarClientFactory.java:127) at org.apache.flink.connector.pulsar.source.reader.PulsarSourceReader.create(PulsarSourceReader.java:266) at org.apache.flink.connector.pulsar.source.PulsarSource.createReader(PulsarSource.java:137) at org.apache.flink.streaming.api.operators.SourceOperator.initReader(SourceOperator.java:312) at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init(SourceOperatorStreamTask.java:93) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:699) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.base/java.lang.Thread.run(Thread.java:829) The referenced method 'connectionMaxIdleSeconds' is only available in the Pulsar 2.11 client when looking at the source code. I am not sure whether the documentation is wrong and the Flink Pulsar Connector 2.11 is the intended Pulsar version. However, my understanding is that Pulsar 2.11 is targeted toward java 17. In the maven repository and would create the need for mixed Java 11 and Java 17 deployment. Documentation cleanup and a reference to the appropriate Java versions is needed. A fix to the 1.17.1 Flink pulsar connector may alternatively be required. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-24302) Direct buffer memory leak on Pulsar connector with Java 11
[ https://issues.apache.org/jira/browse/FLINK-24302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17632436#comment-17632436 ] Jason Kania commented on FLINK-24302: - [~syhily] The Pulsar team is asking for the entire stack trace to be able to investigate the issue further (https://github.com/apache/pulsar/issues/17989#issuecomment-1275530307). > Direct buffer memory leak on Pulsar connector with Java 11 > -- > > Key: FLINK-24302 > URL: https://issues.apache.org/jira/browse/FLINK-24302 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Pulsar >Affects Versions: 1.14.0 >Reporter: Yufan Sheng >Priority: Major > Labels: test-stability > > Running the Pulsar connector with multiple split readers on Java 11 could > throw {{a java.lang.OutOfMemoryError exception}}. > {code:java} > Caused by: java.util.concurrent.CompletionException: > org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector$RetryException: > Could not complete the operation. Number of retries has been exhausted. > Failed reason: java.lang.OutOfMemoryError: Direct buffer memory > at > java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) > at > java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) > at > java.base/java.util.concurrent.CompletableFuture$OrApply.tryFire(CompletableFuture.java:1503) > ... 42 more > Caused by: > org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector$RetryException: > Could not complete the operation. Number of retries has been exhausted. > Failed reason: java.lang.OutOfMemoryError: Direct buffer memory > at > org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$retryOperation$4(AsyncHttpConnector.java:249) > ... 39 more > Caused by: org.apache.pulsar.shade.io.netty.handler.codec.EncoderException: > java.lang.OutOfMemoryError: Direct buffer memory > at > org.apache.pulsar.shade.io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:104) > at > org.apache.pulsar.shade.io.netty.channel.CombinedChannelDuplexHandler.write(CombinedChannelDuplexHandler.java:346) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702) > at > org.apache.pulsar.shade.io.netty.handler.stream.ChunkedWriteHandler.doFlush(ChunkedWriteHandler.java:303) > at > org.apache.pulsar.shade.io.netty.handler.stream.ChunkedWriteHandler.flush(ChunkedWriteHandler.java:132) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758) > at > org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1020) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:311) > at > org.apache.pulsar.shade.org.asynchttpclient.netty.request.NettyRequestSender.writeRequest(NettyRequestSender.java:420) > ... 23 more > {code} > The reason is that under Java 11, the Netty will allocate memory from the > pool of Java Direct Memory and is affected by the MaxDirectMemory limit. > Under Java 8, it allocates native memory and is not affected by that setting. > We have to reduce the direct memory usage by using a newer Pulsar client > which has a memory-limits configuration. > This issue is addressed on Pulsar, and > [PIP-74|https://github.com/apache/pulsar/wiki/PIP-74%3A-Pulsar-client-memory-limits] > has been created for resolving this issue. > We should keep this issue open with no resolved versions until Pulsar > provides a new client with memory limits. > h2. Update: 2022/08/04 > The memory limit on consumer API has been released > https://github.com/apache/pulsar/pull/15216, we need to add > autoScaledReceiverQueueSizeEnabled option to enable
[jira] [Commented] (FLINK-27611) ConcurrentModificationException during Flink-Pulsar checkpoint notification
[ https://issues.apache.org/jira/browse/FLINK-27611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17598045#comment-17598045 ] Jason Kania commented on FLINK-27611: - Understood, unfortunately, it is a blocker for us, just not one for the larger Flink ecosystem. > ConcurrentModificationException during Flink-Pulsar checkpoint notification > --- > > Key: FLINK-27611 > URL: https://issues.apache.org/jira/browse/FLINK-27611 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.15.0 >Reporter: Jason Kania >Assignee: Yufan Sheng >Priority: Major > Labels: pull-request-available > > When attempting to run a job that was working in 1.12.7, but upgraded to > 1.15.0, the following exception is occurring outside of the control of my own > code: > > java.util.ConcurrentModificationException > at > java.base/java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1208) > at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1244) > at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1239) > at > org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader.notifyCheckpointComplete(PulsarUnorderedSourceReader.java:129) > at > org.apache.flink.streaming.api.operators.SourceOperator.notifyCheckpointComplete(SourceOperator.java:511) > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:409) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:343) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1384) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$14(StreamTask.java:1325) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.base/java.lang.Thread.run(Thread.java:829) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27611) ConcurrentModificationException during Flink-Pulsar checkpoint notification
[ https://issues.apache.org/jira/browse/FLINK-27611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17581330#comment-17581330 ] Jason Kania commented on FLINK-27611: - May I ask why this fix was not included in the 1.15.2 release? It really blocks usage of 1.15 for us. > ConcurrentModificationException during Flink-Pulsar checkpoint notification > --- > > Key: FLINK-27611 > URL: https://issues.apache.org/jira/browse/FLINK-27611 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.15.0 >Reporter: Jason Kania >Priority: Major > Labels: pull-request-available > > When attempting to run a job that was working in 1.12.7, but upgraded to > 1.15.0, the following exception is occurring outside of the control of my own > code: > > java.util.ConcurrentModificationException > at > java.base/java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1208) > at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1244) > at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1239) > at > org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader.notifyCheckpointComplete(PulsarUnorderedSourceReader.java:129) > at > org.apache.flink.streaming.api.operators.SourceOperator.notifyCheckpointComplete(SourceOperator.java:511) > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:409) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:343) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1384) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$14(StreamTask.java:1325) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.base/java.lang.Thread.run(Thread.java:829) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27611) ConcurrentModificationException during Flink-Pulsar checkpoint notification
[ https://issues.apache.org/jira/browse/FLINK-27611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17558211#comment-17558211 ] Jason Kania commented on FLINK-27611: - Is there a possibility of this getting into the 1.15.1 patch release? I do know the process for inclusion but since this affects all available releases, it really blocks usage with Pulsar. > ConcurrentModificationException during Flink-Pulsar checkpoint notification > --- > > Key: FLINK-27611 > URL: https://issues.apache.org/jira/browse/FLINK-27611 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.15.0 >Reporter: Jason Kania >Priority: Major > Labels: pull-request-available > > When attempting to run a job that was working in 1.12.7, but upgraded to > 1.15.0, the following exception is occurring outside of the control of my own > code: > > java.util.ConcurrentModificationException > at > java.base/java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1208) > at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1244) > at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1239) > at > org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader.notifyCheckpointComplete(PulsarUnorderedSourceReader.java:129) > at > org.apache.flink.streaming.api.operators.SourceOperator.notifyCheckpointComplete(SourceOperator.java:511) > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:409) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:343) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1384) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$14(StreamTask.java:1325) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.base/java.lang.Thread.run(Thread.java:829) -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-27611) ConcurrentModificationException during Flink-Pulsar checkpoint notification
[ https://issues.apache.org/jira/browse/FLINK-27611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17541478#comment-17541478 ] Jason Kania commented on FLINK-27611: - Does the most recent comment mean that it should go to the Pulsar project or is the issue with code in the Flink project? > ConcurrentModificationException during Flink-Pulsar checkpoint notification > --- > > Key: FLINK-27611 > URL: https://issues.apache.org/jira/browse/FLINK-27611 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.15.0 >Reporter: Jason Kania >Priority: Major > > When attempting to run a job that was working in 1.12.7, but upgraded to > 1.15.0, the following exception is occurring outside of the control of my own > code: > > java.util.ConcurrentModificationException > at > java.base/java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1208) > at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1244) > at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1239) > at > org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader.notifyCheckpointComplete(PulsarUnorderedSourceReader.java:129) > at > org.apache.flink.streaming.api.operators.SourceOperator.notifyCheckpointComplete(SourceOperator.java:511) > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:409) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:343) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1384) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$14(StreamTask.java:1325) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.base/java.lang.Thread.run(Thread.java:829) -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-27611) ConcurrentModificationException during Flink-Pulsar checkpoint notification
[ https://issues.apache.org/jira/browse/FLINK-27611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Kania updated FLINK-27611: Component/s: Connectors / Pulsar > ConcurrentModificationException during Flink-Pulsar checkpoint notification > --- > > Key: FLINK-27611 > URL: https://issues.apache.org/jira/browse/FLINK-27611 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.15.0 >Reporter: Jason Kania >Priority: Major > > When attempting to run a job that was working in 1.12.7, but upgraded to > 1.15.0, the following exception is occurring outside of the control of my own > code: > > java.util.ConcurrentModificationException > at > java.base/java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1208) > at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1244) > at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1239) > at > org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader.notifyCheckpointComplete(PulsarUnorderedSourceReader.java:129) > at > org.apache.flink.streaming.api.operators.SourceOperator.notifyCheckpointComplete(SourceOperator.java:511) > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:409) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:343) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1384) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$14(StreamTask.java:1325) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.base/java.lang.Thread.run(Thread.java:829) -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-27611) ConcurrentModificationException during Flink-Pulsar checkpoint notification
[ https://issues.apache.org/jira/browse/FLINK-27611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Kania updated FLINK-27611: Issue Type: Bug (was: Improvement) > ConcurrentModificationException during Flink-Pulsar checkpoint notification > --- > > Key: FLINK-27611 > URL: https://issues.apache.org/jira/browse/FLINK-27611 > Project: Flink > Issue Type: Bug >Affects Versions: 1.15.0 >Reporter: Jason Kania >Priority: Major > > When attempting to run a job that was working in 1.12.7, but upgraded to > 1.15.0, the following exception is occurring outside of the control of my own > code: > > java.util.ConcurrentModificationException > at > java.base/java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1208) > at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1244) > at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1239) > at > org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader.notifyCheckpointComplete(PulsarUnorderedSourceReader.java:129) > at > org.apache.flink.streaming.api.operators.SourceOperator.notifyCheckpointComplete(SourceOperator.java:511) > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:409) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:343) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1384) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$14(StreamTask.java:1325) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.base/java.lang.Thread.run(Thread.java:829) -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27611) ConcurrentModificationException during Flink-Pulsar checkpoint notification
Jason Kania created FLINK-27611: --- Summary: ConcurrentModificationException during Flink-Pulsar checkpoint notification Key: FLINK-27611 URL: https://issues.apache.org/jira/browse/FLINK-27611 Project: Flink Issue Type: Improvement Affects Versions: 1.15.0 Reporter: Jason Kania When attempting to run a job that was working in 1.12.7, but upgraded to 1.15.0, the following exception is occurring outside of the control of my own code: java.util.ConcurrentModificationException at java.base/java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1208) at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1244) at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1239) at org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader.notifyCheckpointComplete(PulsarUnorderedSourceReader.java:129) at org.apache.flink.streaming.api.operators.SourceOperator.notifyCheckpointComplete(SourceOperator.java:511) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:409) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:343) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1384) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$14(StreamTask.java:1325) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.base/java.lang.Thread.run(Thread.java:829) -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27568) Cannot submit JobGraph
Jason Kania created FLINK-27568: --- Summary: Cannot submit JobGraph Key: FLINK-27568 URL: https://issues.apache.org/jira/browse/FLINK-27568 Project: Flink Issue Type: Bug Affects Versions: 1.15.0 Reporter: Jason Kania I recently upgraded from 1.12.7 to 1.15.0 and encountered an issue in submitting jobs via the command line. The following exception gets generated during submission. The jobs have been submitted in the past on the same machine in the 1.12.7 release without issue. ```org.apache.flink.util.FlinkException: Failed to execute job 'TestIngester'. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2108) at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:188) at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:119) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1969) at com.server.processing.TestIngesterJ.main(TestIngesterJ.java:133) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:836) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1078) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1156) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1156) Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$11(RestClusterClient.java:434) at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) at java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:373) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085) at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.flink.runtime.rest.util.RestClientException: [org.apache.flink.runtime.rest.handler.RestHandlerException: Could not upload job files. at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:201) at java.base/java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1236) at java.base/java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1205) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at
[jira] [Commented] (FLINK-16468) BlobClient rapid retrieval retries on failure opens too many sockets
[ https://issues.apache.org/jira/browse/FLINK-16468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227405#comment-17227405 ] Jason Kania commented on FLINK-16468: - Up until now, I have been redirected from these activities. The problem still exists in that rapid reconnections occur, but I have not had any chance to investigate and looks like I won't for a while. If you wish to close, feel free and I will reference this issue if I am able to open and reinvestigate. > BlobClient rapid retrieval retries on failure opens too many sockets > > > Key: FLINK-16468 > URL: https://issues.apache.org/jira/browse/FLINK-16468 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.8.3, 1.9.2, 1.10.0 > Environment: Linux ubuntu servers running, patch current latest > Ubuntu patch current release java 8 JRE >Reporter: Jason Kania >Priority: Major > Fix For: 1.12.0 > > > In situations where the BlobClient retrieval fails as in the following log, > rapid retries will exhaust the open sockets. All the retries happen within a > few milliseconds. > {noformat} > 2020-03-06 17:19:07,116 ERROR org.apache.flink.runtime.blob.BlobClient - > Failed to fetch BLOB > cddd17ef76291dd60eee9fd36085647a/p-bcd61652baba25d6863cf17843a2ef64f4c801d5-c1781532477cf65ff1c1e7d72dccabc7 > from aaa-1/10.0.1.1:45145 and store it under > /tmp/blobStore-7328ed37-8bc7-4af7-a56c-474e264157c9/incoming/temp-0004 > Retrying... > {noformat} > The above is output repeatedly until the following error occurs: > {noformat} > java.io.IOException: Could not connect to BlobServer at address > aaa-1/10.0.1.1:45145 > at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:100) > at > org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:143) > at > org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181) > at > org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:202) > at > org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120) > at > org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:915) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:595) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.net.SocketException: Too many open files > at java.net.Socket.createImpl(Socket.java:478) > at java.net.Socket.connect(Socket.java:605) > at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:95) > ... 8 more > {noformat} > The retries should have some form of backoff in this situation to avoid > flooding the logs and exhausting other resources on the server. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16468) BlobClient rapid retrieval retries on failure opens too many sockets
[ https://issues.apache.org/jira/browse/FLINK-16468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17114190#comment-17114190 ] Jason Kania commented on FLINK-16468: - Sorry [~gjy], not to this point. The current economic/health situation has resulted in a need to redirect our efforts for the moment. We have not done more testing in the short term. > BlobClient rapid retrieval retries on failure opens too many sockets > > > Key: FLINK-16468 > URL: https://issues.apache.org/jira/browse/FLINK-16468 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.8.3, 1.9.2, 1.10.0 > Environment: Linux ubuntu servers running, patch current latest > Ubuntu patch current release java 8 JRE >Reporter: Jason Kania >Priority: Major > Fix For: 1.11.0 > > > In situations where the BlobClient retrieval fails as in the following log, > rapid retries will exhaust the open sockets. All the retries happen within a > few milliseconds. > {noformat} > 2020-03-06 17:19:07,116 ERROR org.apache.flink.runtime.blob.BlobClient - > Failed to fetch BLOB > cddd17ef76291dd60eee9fd36085647a/p-bcd61652baba25d6863cf17843a2ef64f4c801d5-c1781532477cf65ff1c1e7d72dccabc7 > from aaa-1/10.0.1.1:45145 and store it under > /tmp/blobStore-7328ed37-8bc7-4af7-a56c-474e264157c9/incoming/temp-0004 > Retrying... > {noformat} > The above is output repeatedly until the following error occurs: > {noformat} > java.io.IOException: Could not connect to BlobServer at address > aaa-1/10.0.1.1:45145 > at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:100) > at > org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:143) > at > org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181) > at > org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:202) > at > org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120) > at > org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:915) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:595) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.net.SocketException: Too many open files > at java.net.Socket.createImpl(Socket.java:478) > at java.net.Socket.connect(Socket.java:605) > at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:95) > ... 8 more > {noformat} > The retries should have some form of backoff in this situation to avoid > flooding the logs and exhausting other resources on the server. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16468) BlobClient rapid retrieval retries on failure opens too many sockets
[ https://issues.apache.org/jira/browse/FLINK-16468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17066231#comment-17066231 ] Jason Kania commented on FLINK-16468: - [~gjy], the situation is unreliably unreliable, but I will try to collect some details over time. With Flink, Pulsar, Zookeeper and database connections in the mix, we get different errors/exceptions each time there is a network failure or we simulate one depending on which component is the first to encounter the networking issue and when different components attempt to recover. > BlobClient rapid retrieval retries on failure opens too many sockets > > > Key: FLINK-16468 > URL: https://issues.apache.org/jira/browse/FLINK-16468 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.8.3, 1.9.2, 1.10.0 > Environment: Linux ubuntu servers running, patch current latest > Ubuntu patch current release java 8 JRE >Reporter: Jason Kania >Priority: Major > Fix For: 1.11.0 > > > In situations where the BlobClient retrieval fails as in the following log, > rapid retries will exhaust the open sockets. All the retries happen within a > few milliseconds. > {noformat} > 2020-03-06 17:19:07,116 ERROR org.apache.flink.runtime.blob.BlobClient - > Failed to fetch BLOB > cddd17ef76291dd60eee9fd36085647a/p-bcd61652baba25d6863cf17843a2ef64f4c801d5-c1781532477cf65ff1c1e7d72dccabc7 > from aaa-1/10.0.1.1:45145 and store it under > /tmp/blobStore-7328ed37-8bc7-4af7-a56c-474e264157c9/incoming/temp-0004 > Retrying... > {noformat} > The above is output repeatedly until the following error occurs: > {noformat} > java.io.IOException: Could not connect to BlobServer at address > aaa-1/10.0.1.1:45145 > at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:100) > at > org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:143) > at > org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181) > at > org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:202) > at > org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120) > at > org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:915) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:595) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.net.SocketException: Too many open files > at java.net.Socket.createImpl(Socket.java:478) > at java.net.Socket.connect(Socket.java:605) > at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:95) > ... 8 more > {noformat} > The retries should have some form of backoff in this situation to avoid > flooding the logs and exhausting other resources on the server. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16468) BlobClient rapid retrieval retries on failure opens too many sockets
[ https://issues.apache.org/jira/browse/FLINK-16468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17065174#comment-17065174 ] Jason Kania commented on FLINK-16468: - [~gjy], In looking at my particular situation as a reference, the network outage seemed to be just less than one minute and that was more than enough to bring Flink, the running jobs and the related queuing applications to the point where they were not recoverable on their own after the network recovered. What I hope from a recovery strategy, and have implemented in the telecoms industry in the past, is that this recovery can happen on its own. The 1 second delay only seems to moderate the CPU utilization but not help with the applications giving up and being left in an unknown state. Without some form of increasing backoff, you either need to have a large number of retries or expect the application to give up. Since the applications will take at least 10 seconds to restart, the different components bouncing at the same time in an outage such as this means there is just too much instability for all the components to recover. That said, I understand what you mean about not having control over the libraries such as Curator, for example. Just today, I managed to create an unhandled exception where the zookeeper client gave up, leaving Flink in a funny state by playing with the network interface. I also think that the 1.10 documentation is an improvement. The test will be once we migrate to using it. Maybe having a pluggable restart strategy for all components could better allow users to handle the particulars of each installation? > BlobClient rapid retrieval retries on failure opens too many sockets > > > Key: FLINK-16468 > URL: https://issues.apache.org/jira/browse/FLINK-16468 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.8.3, 1.9.2, 1.10.0 > Environment: Linux ubuntu servers running, patch current latest > Ubuntu patch current release java 8 JRE >Reporter: Jason Kania >Priority: Major > Fix For: 1.11.0 > > > In situations where the BlobClient retrieval fails as in the following log, > rapid retries will exhaust the open sockets. All the retries happen within a > few milliseconds. > {noformat} > 2020-03-06 17:19:07,116 ERROR org.apache.flink.runtime.blob.BlobClient - > Failed to fetch BLOB > cddd17ef76291dd60eee9fd36085647a/p-bcd61652baba25d6863cf17843a2ef64f4c801d5-c1781532477cf65ff1c1e7d72dccabc7 > from aaa-1/10.0.1.1:45145 and store it under > /tmp/blobStore-7328ed37-8bc7-4af7-a56c-474e264157c9/incoming/temp-0004 > Retrying... > {noformat} > The above is output repeatedly until the following error occurs: > {noformat} > java.io.IOException: Could not connect to BlobServer at address > aaa-1/10.0.1.1:45145 > at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:100) > at > org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:143) > at > org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181) > at > org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:202) > at > org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120) > at > org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:915) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:595) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.net.SocketException: Too many open files > at java.net.Socket.createImpl(Socket.java:478) > at java.net.Socket.connect(Socket.java:605) > at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:95) > ... 8 more > {noformat} > The retries should have some form of backoff in this situation to avoid > flooding the logs and exhausting other resources on the server. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16468) BlobClient rapid retrieval retries on failure opens too many sockets
[ https://issues.apache.org/jira/browse/FLINK-16468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17064899#comment-17064899 ] Jason Kania commented on FLINK-16468: - [~gjy], to me, as a user, your restart strategy seems to lack a consolidate approach with the patchwork addition of individual small timeouts. I ask the open question of how many more of the small timeouts may have to be added for different components that exhibit the same instant restart approach in the face of external failures? I also question where users can expect to learn about each individual timer in detail and what each should be. In my opinion, users are left finding and tuning these values in the face of infrequent failures rather than having the option to strategically handle these situations proactively. Because the remedy you suggest seems unlikely to be effective in my situation, I am not interested in contributing to this particular proposed solution. > BlobClient rapid retrieval retries on failure opens too many sockets > > > Key: FLINK-16468 > URL: https://issues.apache.org/jira/browse/FLINK-16468 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.8.3, 1.9.2, 1.10.0 > Environment: Linux ubuntu servers running, patch current latest > Ubuntu patch current release java 8 JRE >Reporter: Jason Kania >Priority: Major > Fix For: 1.11.0 > > > In situations where the BlobClient retrieval fails as in the following log, > rapid retries will exhaust the open sockets. All the retries happen within a > few milliseconds. > {noformat} > 2020-03-06 17:19:07,116 ERROR org.apache.flink.runtime.blob.BlobClient - > Failed to fetch BLOB > cddd17ef76291dd60eee9fd36085647a/p-bcd61652baba25d6863cf17843a2ef64f4c801d5-c1781532477cf65ff1c1e7d72dccabc7 > from aaa-1/10.0.1.1:45145 and store it under > /tmp/blobStore-7328ed37-8bc7-4af7-a56c-474e264157c9/incoming/temp-0004 > Retrying... > {noformat} > The above is output repeatedly until the following error occurs: > {noformat} > java.io.IOException: Could not connect to BlobServer at address > aaa-1/10.0.1.1:45145 > at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:100) > at > org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:143) > at > org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181) > at > org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:202) > at > org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120) > at > org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:915) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:595) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.net.SocketException: Too many open files > at java.net.Socket.createImpl(Socket.java:478) > at java.net.Socket.connect(Socket.java:605) > at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:95) > ... 8 more > {noformat} > The retries should have some form of backoff in this situation to avoid > flooding the logs and exhausting other resources on the server. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16475) PulsarConsumerSource messageReceiveTimeoutMs parameter should be configurable
[ https://issues.apache.org/jira/browse/FLINK-16475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17063683#comment-17063683 ] Jason Kania commented on FLINK-16475: - Sorry, was confused because Pulsar switched from tracking in JIRA to tracking in github. Have opened [https://github.com/apache/pulsar/issues/6578] for those who might encounter this in the future. > PulsarConsumerSource messageReceiveTimeoutMs parameter should be configurable > - > > Key: FLINK-16475 > URL: https://issues.apache.org/jira/browse/FLINK-16475 > Project: Flink > Issue Type: Bug >Affects Versions: 1.9.2 > Environment: Not releveant >Reporter: Jason Kania >Priority: Major > > The messageReceiveTimeoutMs value in the PulsarConsumerSource class is > hardcoded to 100ms but should be configurable to accommodate different > hardware setups such as developer labs where the performance level may not be > as critical. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16468) BlobClient rapid retrieval retries on failure opens too many sockets
[ https://issues.apache.org/jira/browse/FLINK-16468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17061785#comment-17061785 ] Jason Kania commented on FLINK-16468: - [~gjy] I will happily update the user docs, but would appreciate some input on what the implications might be since my lack of experience on the implications was the part of the reason why this issue and https://issues.apache.org/jira/browse/FLINK-16470 were raised in the first place. If you assign this to me, I can provide a backoff implementation. However, you mentioned a backoff time versus backoff algorithm as [~NicoK] mentioned and I was thinking. Given the option, I would go with a backoff algorithm going something like 1,2,4,8,16... seconds which provides both user feedback and some chance for network recovery. If the BlobClient follows the same approach, then it too would use the 'exponential' backoff as would all the components also currently tied to the restart delay. > BlobClient rapid retrieval retries on failure opens too many sockets > > > Key: FLINK-16468 > URL: https://issues.apache.org/jira/browse/FLINK-16468 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.8.3, 1.9.2, 1.10.0 > Environment: Linux ubuntu servers running, patch current latest > Ubuntu patch current release java 8 JRE >Reporter: Jason Kania >Priority: Major > Fix For: 1.11.0 > > > In situations where the BlobClient retrieval fails as in the following log, > rapid retries will exhaust the open sockets. All the retries happen within a > few milliseconds. > {noformat} > 2020-03-06 17:19:07,116 ERROR org.apache.flink.runtime.blob.BlobClient - > Failed to fetch BLOB > cddd17ef76291dd60eee9fd36085647a/p-bcd61652baba25d6863cf17843a2ef64f4c801d5-c1781532477cf65ff1c1e7d72dccabc7 > from aaa-1/10.0.1.1:45145 and store it under > /tmp/blobStore-7328ed37-8bc7-4af7-a56c-474e264157c9/incoming/temp-0004 > Retrying... > {noformat} > The above is output repeatedly until the following error occurs: > {noformat} > java.io.IOException: Could not connect to BlobServer at address > aaa-1/10.0.1.1:45145 > at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:100) > at > org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:143) > at > org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181) > at > org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:202) > at > org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120) > at > org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:915) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:595) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.net.SocketException: Too many open files > at java.net.Socket.createImpl(Socket.java:478) > at java.net.Socket.connect(Socket.java:605) > at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:95) > ... 8 more > {noformat} > The retries should have some form of backoff in this situation to avoid > flooding the logs and exhausting other resources on the server. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16468) BlobClient rapid retrieval retries on failure opens too many sockets
[ https://issues.apache.org/jira/browse/FLINK-16468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17060282#comment-17060282 ] Jason Kania commented on FLINK-16468: - [~gjy] These were reported on the same day but were the result of distinct failures on different days. If they are both due to the restart delay then I would suggest more detail be added to the restart delay documentation text because right now its implications are not fully explained. Additionally, a 1 second default restart delay is still going to lead to sockets in the TIME-WAIT state. This would suggest that a backoff algorithm would be more appropriate than a fixed delay. > BlobClient rapid retrieval retries on failure opens too many sockets > > > Key: FLINK-16468 > URL: https://issues.apache.org/jira/browse/FLINK-16468 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.8.3, 1.9.2, 1.10.0 > Environment: Linux ubuntu servers running, patch current latest > Ubuntu patch current release java 8 JRE >Reporter: Jason Kania >Priority: Major > Fix For: 1.11.0 > > > In situations where the BlobClient retrieval fails as in the following log, > rapid retries will exhaust the open sockets. All the retries happen within a > few milliseconds. > {noformat} > 2020-03-06 17:19:07,116 ERROR org.apache.flink.runtime.blob.BlobClient - > Failed to fetch BLOB > cddd17ef76291dd60eee9fd36085647a/p-bcd61652baba25d6863cf17843a2ef64f4c801d5-c1781532477cf65ff1c1e7d72dccabc7 > from aaa-1/10.0.1.1:45145 and store it under > /tmp/blobStore-7328ed37-8bc7-4af7-a56c-474e264157c9/incoming/temp-0004 > Retrying... > {noformat} > The above is output repeatedly until the following error occurs: > {noformat} > java.io.IOException: Could not connect to BlobServer at address > aaa-1/10.0.1.1:45145 > at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:100) > at > org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:143) > at > org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181) > at > org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:202) > at > org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120) > at > org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:915) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:595) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.net.SocketException: Too many open files > at java.net.Socket.createImpl(Socket.java:478) > at java.net.Socket.connect(Socket.java:605) > at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:95) > ... 8 more > {noformat} > The retries should have some form of backoff in this situation to avoid > flooding the logs and exhausting other resources on the server. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16468) BlobClient rapid retrieval retries on failure opens too many sockets
[ https://issues.apache.org/jira/browse/FLINK-16468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17057498#comment-17057498 ] Jason Kania commented on FLINK-16468: - [~NicoK], I will try the blob.fetch.retries settings and see. As for the deployment, it is only 4 slots in one task manager on a 2 CPU system so was not expecting to exhaust the number of sockets either. It was the sheer number of retries really quickly that seems to have done it so it may have been all the closed but TIME-WAIT status socket connections at the OS level that were not yet available for reuse that was causing the issue. If the issue happens again, I will see if more information is available. However, a backoff algorithm does seem to be a good plan. > BlobClient rapid retrieval retries on failure opens too many sockets > > > Key: FLINK-16468 > URL: https://issues.apache.org/jira/browse/FLINK-16468 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.9.2 > Environment: Linux ubuntu servers running, patch current latest > Ubuntu patch current release java 8 JRE >Reporter: Jason Kania >Priority: Major > > In situations where the BlobClient retrieval fails as in the following log, > rapid retries will exhaust the open sockets. All the retries happen within a > few milliseconds. > {{2020-03-06 17:19:07,116 ERROR org.apache.flink.runtime.blob.BlobClient - > Failed to fetch BLOB > cddd17ef76291dd60eee9fd36085647a/p-bcd61652baba25d6863cf17843a2ef64f4c801d5-c1781532477cf65ff1c1e7d72dccabc7 > from aaa-1/10.0.1.1:45145 and store it under > /tmp/blobStore-7328ed37-8bc7-4af7-a56c-474e264157c9/incoming/temp-0004 > Retrying...}} > The above is output repeatedly until the following error occurs: > {{java.io.IOException: Could not connect to BlobServer at address > aaa-1/10.0.1.1:45145}} > {{ at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:100)}} > {{ at > org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:143)}} > {{ at > org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181)}} > {{ at > org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:202)}} > {{ at > org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)}} > {{ at > org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:915)}} > {{ at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:595)}} > {{ at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)}} > {{ at java.lang.Thread.run(Thread.java:748)}} > {{Caused by: java.net.SocketException: Too many open files}} > {{ at java.net.Socket.createImpl(Socket.java:478)}} > {{ at java.net.Socket.connect(Socket.java:605)}} > {{ at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:95)}} > {{ ... 8 more}} > The retries should have some form of backoff in this situation to avoid > flooding the logs and exhausting other resources on the server. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16470) Network failure causes Checkpoint Coordinator to flood disk with exceptions
[ https://issues.apache.org/jira/browse/FLINK-16470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17056583#comment-17056583 ] Jason Kania commented on FLINK-16470: - [~gjy], the restart strategy was just the default and had no delay so that explains the rapid restart. I did not understand that the restart strategy was related to this interaction with the Zookeeper and the RegionStrategy. Thanks for connecting the two. I will try with a restart strategy and see what happens another time if there is a similar failure. > Network failure causes Checkpoint Coordinator to flood disk with exceptions > --- > > Key: FLINK-16470 > URL: https://issues.apache.org/jira/browse/FLINK-16470 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / Coordination >Affects Versions: 1.9.2 > Environment: Latest patch current Ubuntu release with latest java 8 > JRE. >Reporter: Jason Kania >Priority: Major > > When a networking error occurred that prevented access to the shared folder > mounted over NFS, the CheckpointCoordinator flooded the logs with the > following: > > {{org.apache.flink.util.FlinkException: Could not retrieve checkpoint 158365 > from state handle under /0158365. This indicates that the > retrieved state handle is broken. Try cleaning the state handle store.}} > {{ at > org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.retrieveCompletedCheckpoint(ZooKeeperCompletedCheckpointStore.java:345)}} > {{ at > org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.recover(ZooKeeperCompletedCheckpointStore.java:175)}} > {{ at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedState(CheckpointCoordinator.java:1014)}} > {{ at > org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG.resetTasks(AdaptedRestartPipelinedRegionStrategyNG.java:205)}} > {{ at > org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG.lambda$createResetAndRescheduleTasksCallback$1(AdaptedRestartPipelinedRegionStrategyNG.java:149)}} > {{ at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$scheduleWithDelay$3(FutureUtils.java:202)}} > {{ at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$scheduleWithDelay$4(FutureUtils.java:226)}} > {{ at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)}} > {{ at java.util.concurrent.FutureTask.run(FutureTask.java:266)}} > {{ at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)}} > {{ at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)}} > {{ at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)}} > {{ at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)}} > {{ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)}} > {{ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)}} > {{ at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)}} > {{ at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)}} > {{ at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)}} > {{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)}} > {{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)}} > {{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)}} > {{ at akka.actor.Actor.aroundReceive(Actor.scala:517)}} > {{ at akka.actor.Actor.aroundReceive$(Actor.scala:515)}} > {{ at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)}} > {{ at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)}} > {{ at akka.actor.ActorCell.invoke(ActorCell.scala:561)}} > {{ at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)}} > {{ at akka.dispatch.Mailbox.run(Mailbox.scala:225)}} > {{ at akka.dispatch.Mailbox.exec(Mailbox.scala:235)}} > {{ at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)}} > {{ at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)}} > {{ at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)}} > {{ at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)}} > {{Caused by: java.io.FileNotFoundException: > /mnt/shared/completedCheckpoint53ed9d9197f7 (No such file or directory)}} > {{ at java.io.FileInputStream.open0(Native Method)}} > {{ at java.io.FileInputStream.open(FileInputStream.java:195)}} > {{ at java.io.FileInputStream.(FileInputStream.java:138)}} > {{ at > org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)}} > {{ at >
[jira] [Commented] (FLINK-16468) BlobClient rapid retrieval retries on failure opens too many sockets
[ https://issues.apache.org/jira/browse/FLINK-16468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17056533#comment-17056533 ] Jason Kania commented on FLINK-16468: - [~azagrebin], the only thing I saw was a lot of repeats of the IOException before the last one included here and the following SocketException. I did not see anything preceding it and the logs were deleted because of the flood and excess disk utilization. The debug logs for the BlobClient are now enabled. I will update this issue if the error occurs again. The blob.fetch.retries was not modified from the default value. > BlobClient rapid retrieval retries on failure opens too many sockets > > > Key: FLINK-16468 > URL: https://issues.apache.org/jira/browse/FLINK-16468 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.9.2 > Environment: Linux ubuntu servers running, patch current latest > Ubuntu patch current release java 8 JRE >Reporter: Jason Kania >Priority: Major > > In situations where the BlobClient retrieval fails as in the following log, > rapid retries will exhaust the open sockets. All the retries happen within a > few milliseconds. > {{2020-03-06 17:19:07,116 ERROR org.apache.flink.runtime.blob.BlobClient - > Failed to fetch BLOB > cddd17ef76291dd60eee9fd36085647a/p-bcd61652baba25d6863cf17843a2ef64f4c801d5-c1781532477cf65ff1c1e7d72dccabc7 > from aaa-1/10.0.1.1:45145 and store it under > /tmp/blobStore-7328ed37-8bc7-4af7-a56c-474e264157c9/incoming/temp-0004 > Retrying...}} > The above is output repeatedly until the following error occurs: > {{java.io.IOException: Could not connect to BlobServer at address > aaa-1/10.0.1.1:45145}} > {{ at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:100)}} > {{ at > org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:143)}} > {{ at > org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181)}} > {{ at > org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:202)}} > {{ at > org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)}} > {{ at > org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:915)}} > {{ at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:595)}} > {{ at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)}} > {{ at java.lang.Thread.run(Thread.java:748)}} > {{Caused by: java.net.SocketException: Too many open files}} > {{ at java.net.Socket.createImpl(Socket.java:478)}} > {{ at java.net.Socket.connect(Socket.java:605)}} > {{ at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:95)}} > {{ ... 8 more}} > The retries should have some form of backoff in this situation to avoid > flooding the logs and exhausting other resources on the server. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16475) PulsarConsumerSource messageReceiveTimeoutMs parameter should be configurable
Jason Kania created FLINK-16475: --- Summary: PulsarConsumerSource messageReceiveTimeoutMs parameter should be configurable Key: FLINK-16475 URL: https://issues.apache.org/jira/browse/FLINK-16475 Project: Flink Issue Type: Bug Affects Versions: 1.9.2 Environment: Not releveant Reporter: Jason Kania The messageReceiveTimeoutMs value in the PulsarConsumerSource class is hardcoded to 100ms but should be configurable to accommodate different hardware setups such as developer labs where the performance level may not be as critical. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16470) Network failure causes Checkpoint Coordinator to flood disk with exceptions
Jason Kania created FLINK-16470: --- Summary: Network failure causes Checkpoint Coordinator to flood disk with exceptions Key: FLINK-16470 URL: https://issues.apache.org/jira/browse/FLINK-16470 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.9.2 Environment: Latest patch current Ubuntu release with latest java 8 JRE. Reporter: Jason Kania When a networking error occurred that prevented access to the shared folder mounted over NFS, the CheckpointCoordinator flooded the logs with the following: {{org.apache.flink.util.FlinkException: Could not retrieve checkpoint 158365 from state handle under /0158365. This indicates that the retrieved state handle is broken. Try cleaning the state handle store.}} {{ at org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.retrieveCompletedCheckpoint(ZooKeeperCompletedCheckpointStore.java:345)}} {{ at org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.recover(ZooKeeperCompletedCheckpointStore.java:175)}} {{ at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedState(CheckpointCoordinator.java:1014)}} {{ at org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG.resetTasks(AdaptedRestartPipelinedRegionStrategyNG.java:205)}} {{ at org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG.lambda$createResetAndRescheduleTasksCallback$1(AdaptedRestartPipelinedRegionStrategyNG.java:149)}} {{ at org.apache.flink.runtime.concurrent.FutureUtils.lambda$scheduleWithDelay$3(FutureUtils.java:202)}} {{ at org.apache.flink.runtime.concurrent.FutureUtils.lambda$scheduleWithDelay$4(FutureUtils.java:226)}} {{ at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)}} {{ at java.util.concurrent.FutureTask.run(FutureTask.java:266)}} {{ at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)}} {{ at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)}} {{ at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)}} {{ at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)}} {{ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)}} {{ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)}} {{ at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)}} {{ at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)}} {{ at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)}} {{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)}} {{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)}} {{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)}} {{ at akka.actor.Actor.aroundReceive(Actor.scala:517)}} {{ at akka.actor.Actor.aroundReceive$(Actor.scala:515)}} {{ at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)}} {{ at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)}} {{ at akka.actor.ActorCell.invoke(ActorCell.scala:561)}} {{ at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)}} {{ at akka.dispatch.Mailbox.run(Mailbox.scala:225)}} {{ at akka.dispatch.Mailbox.exec(Mailbox.scala:235)}} {{ at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)}} {{ at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)}} {{ at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)}} {{ at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)}} {{Caused by: java.io.FileNotFoundException: /mnt/shared/completedCheckpoint53ed9d9197f7 (No such file or directory)}} {{ at java.io.FileInputStream.open0(Native Method)}} {{ at java.io.FileInputStream.open(FileInputStream.java:195)}} {{ at java.io.FileInputStream.(FileInputStream.java:138)}} {{ at org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)}} {{ at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)}} {{ at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)}} {{ at org.apache.flink.runtime.state.RetrievableStreamStateHandle.openInputStream(RetrievableStreamStateHandle.java:64)}} {{ at org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:57)}} {{ at org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.retrieveCompletedCheckpoint(ZooKeeperCompletedCheckpointStore.java:339)}} {{ ... 32 more}} The result was very high CPU utilization, high disk IO and a cascade of other application failures. In this situation, there should be a backoff on the retries to not bring down the whole node. Recovery
[jira] [Updated] (FLINK-15744) Some TaskManager Task exceptions are logged as info
[ https://issues.apache.org/jira/browse/FLINK-15744?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Kania updated FLINK-15744: Summary: Some TaskManager Task exceptions are logged as info (was: Some TaskManager Task exception are logged as info) > Some TaskManager Task exceptions are logged as info > --- > > Key: FLINK-15744 > URL: https://issues.apache.org/jira/browse/FLINK-15744 > Project: Flink > Issue Type: Bug > Components: Runtime / Configuration >Affects Versions: 1.9.1 > Environment: 1.9.1 on a recent ubuntu release >Reporter: Jason Kania >Assignee: Jason Kania >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 20m > Remaining Estimate: 0h > > After investigating to find the cause for a submitted job alternating between > the deploying and running states, I tried setting the logging to info level > and realized that some deployment exceptions were being logged as info when > they really should be a warning or error level. > In particular, the following line (at line 960) of the 1.9.1 branch of the > Task class logs as info: > LOG.info("{} ({}) switched from {} to {}.", taskNameWithSubtask, executionId, > currentState, newState, cause); > This log line always has a non null exception cause so it would make sense to > log at warning or error level for this state transition so it is more easily > found. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16468) BlobClient rapid retrieval retries on failure opens too many sockets
Jason Kania created FLINK-16468: --- Summary: BlobClient rapid retrieval retries on failure opens too many sockets Key: FLINK-16468 URL: https://issues.apache.org/jira/browse/FLINK-16468 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.9.2 Environment: Linux ubuntu servers running, patch current latest Ubuntu patch current release java 8 JRE Reporter: Jason Kania In situations where the BlobClient retrieval fails as in the following log, rapid retries will exhaust the open sockets. All the retries happen within a few milliseconds. {{2020-03-06 17:19:07,116 ERROR org.apache.flink.runtime.blob.BlobClient - Failed to fetch BLOB cddd17ef76291dd60eee9fd36085647a/p-bcd61652baba25d6863cf17843a2ef64f4c801d5-c1781532477cf65ff1c1e7d72dccabc7 from aaa-1/10.0.1.1:45145 and store it under /tmp/blobStore-7328ed37-8bc7-4af7-a56c-474e264157c9/incoming/temp-0004 Retrying...}} The above is output repeatedly until the following error occurs: {{java.io.IOException: Could not connect to BlobServer at address aaa-1/10.0.1.1:45145}} {{ at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:100)}} {{ at org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:143)}} {{ at org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181)}} {{ at org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:202)}} {{ at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)}} {{ at org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:915)}} {{ at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:595)}} {{ at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)}} {{ at java.lang.Thread.run(Thread.java:748)}} {{Caused by: java.net.SocketException: Too many open files}} {{ at java.net.Socket.createImpl(Socket.java:478)}} {{ at java.net.Socket.connect(Socket.java:605)}} {{ at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:95)}} {{ ... 8 more}} The retries should have some form of backoff in this situation to avoid flooding the logs and exhausting other resources on the server. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15744) Some TaskManager Task exception are logged as info
Jason Kania created FLINK-15744: --- Summary: Some TaskManager Task exception are logged as info Key: FLINK-15744 URL: https://issues.apache.org/jira/browse/FLINK-15744 Project: Flink Issue Type: Bug Components: Runtime / Configuration Affects Versions: 1.9.1 Environment: 1.9.1 on a recent ubuntu release Reporter: Jason Kania After investigating to find the cause for a submitted job alternating between the deploying and running states, I tried setting the logging to info level and realized that some deployment exceptions were being logged as info when they really should be a warning or error level. In particular, the following line (at line 960) of the 1.9.1 branch of the Task class logs as info: LOG.info("{} ({}) switched from {} to {}.", taskNameWithSubtask, executionId, currentState, newState, cause); This log line always has a non null exception cause so it would make sense to log at warning or error level for this state transition so it is more easily found. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11799) KryoSerializer/OperatorChain ignores copy failure resulting in NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-11799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814589#comment-16814589 ] Jason Kania commented on FLINK-11799: - Hi Liya, Your approach seems reasonable. Thanks, Jason > KryoSerializer/OperatorChain ignores copy failure resulting in > NullPointerException > --- > > Key: FLINK-11799 > URL: https://issues.apache.org/jira/browse/FLINK-11799 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.2 >Reporter: Jason Kania >Priority: Major > > I was encountering a problem with NullPointerExceptions with the deserialized > object reaching my ProcessFunction process() method implementation as a null > value. Upon investigation, I discovered two issues with the implementation of > the KryoSerializer copy(). > 1) The 'public T copy(T from)' method swallows the error if the kryo copy() > call generates an exception. The code should report the copy error at least > once as a warning to be aware that the kryo copy() is failing. I understand > that the code is there to handle the lack of a copy implementation but due to > the potential inefficiency of having to write and read the object instead of > copying it, this would seem useful information to share at the least. It is > also important to have a warning in case the cause of the copy error is > something that needs to be fixed. > 2) The call to 'kryo.readObject(input, from.getClass())' does not handle the > fact that the kryo readObject(Input input, Class aClass) method may return a > null value if there are any issues. This could be handled with a check or > warning in the OperatorChain.CopyingChainingOutput.pushToOperator() method > but is also ignored there, allowing a null value to be passed along without > providing any reason for the null value in logging. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11799) KryoSerializer/OperatorChain ignores copy failure resulting in NullPointerException
Jason Kania created FLINK-11799: --- Summary: KryoSerializer/OperatorChain ignores copy failure resulting in NullPointerException Key: FLINK-11799 URL: https://issues.apache.org/jira/browse/FLINK-11799 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.7.2 Reporter: Jason Kania I was encountering a problem with NullPointerExceptions with the deserialized object reaching my ProcessFunction process() method implementation as a null value. Upon investigation, I discovered two issues with the implementation of the KryoSerializer copy(). 1) The 'public T copy(T from)' method swallows the error if the kryo copy() call generates an exception. The code should report the copy error at least once as a warning to be aware that the kryo copy() is failing. I understand that the code is there to handle the lack of a copy implementation but due to the potential inefficiency of having to write and read the object instead of copying it, this would seem useful information to share at the least. It is also important to have a warning in case the cause of the copy error is something that needs to be fixed. 2) The call to 'kryo.readObject(input, from.getClass())' does not handle the fact that the kryo readObject(Input input, Class aClass) method may return a null value if there are any issues. This could be handled with a check or warning in the OperatorChain.CopyingChainingOutput.pushToOperator() method but is also ignored there, allowing a null value to be passed along without providing any reason for the null value in logging. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8035) Unable to submit job when HA is enabled
[ https://issues.apache.org/jira/browse/FLINK-8035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16622792#comment-16622792 ] Jason Kania commented on FLINK-8035: I would need to know the specific logs you would like to see and the log configuration. The problem that I saw was that there were no logs indicating any issue even with trace enabled. I would simply get a message at the point of timeout without any preceding logs indicating that any messages had been set. > Unable to submit job when HA is enabled > --- > > Key: FLINK-8035 > URL: https://issues.apache.org/jira/browse/FLINK-8035 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.0 > Environment: Mac OS X >Reporter: Robert Metzger >Priority: Critical > > Steps to reproduce: > - Get Flink 1.4 (f5a0b4bdfb) > - Get ZK (3.3.6 in this case) > - Put the following flink-conf.yaml: > {code} > high-availability: zookeeper > high-availability.storageDir: file:///tmp/flink-ha > high-availability.zookeeper.quorum: localhost:2181 > high-availability.zookeeper.path.cluster-id: /my-namespace > {code} > - Start Flink, submit a job (any streaming example will do) > The job submission will time out. On the JobManager, it seems that the job > submission gets stuck when trying to submit something to Zookeeper. > In the JM UI, the job will sit there in status "CREATED" -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8035) Unable to submit job when HA is enabled
[ https://issues.apache.org/jira/browse/FLINK-8035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16615275#comment-16615275 ] Jason Kania commented on FLINK-8035: I encountered this issue in 1.5.3 and subsequently had to roll back to 1.4.2. I had cleaned out the zookeeper data but the issue remained. I was unable to trace to a known cause. I suspect a swallowed error in zookeeper communication because the code to perform the actual low level send for the job list was not executed or at least the breakpoint was never hit. > Unable to submit job when HA is enabled > --- > > Key: FLINK-8035 > URL: https://issues.apache.org/jira/browse/FLINK-8035 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.0 > Environment: Mac OS X >Reporter: Robert Metzger >Priority: Critical > > Steps to reproduce: > - Get Flink 1.4 (f5a0b4bdfb) > - Get ZK (3.3.6 in this case) > - Put the following flink-conf.yaml: > {code} > high-availability: zookeeper > high-availability.storageDir: file:///tmp/flink-ha > high-availability.zookeeper.quorum: localhost:2181 > high-availability.zookeeper.path.cluster-id: /my-namespace > {code} > - Start Flink, submit a job (any streaming example will do) > The job submission will time out. On the JobManager, it seems that the job > submission gets stuck when trying to submit something to Zookeeper. > In the JM UI, the job will sit there in status "CREATED" -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8035) Unable to submit job when HA is enabled
[ https://issues.apache.org/jira/browse/FLINK-8035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605052#comment-16605052 ] Jason Kania commented on FLINK-8035: This issue affects more than just submission. Many of the flink command line calls also timeout because of this issue. In my case, I am unable to upgrade the zookeeper because of other components so I have had to abandon HA mode. > Unable to submit job when HA is enabled > --- > > Key: FLINK-8035 > URL: https://issues.apache.org/jira/browse/FLINK-8035 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.0 > Environment: Mac OS X >Reporter: Robert Metzger >Priority: Critical > > Steps to reproduce: > - Get Flink 1.4 (f5a0b4bdfb) > - Get ZK (3.3.6 in this case) > - Put the following flink-conf.yaml: > {code} > high-availability: zookeeper > high-availability.storageDir: file:///tmp/flink-ha > high-availability.zookeeper.quorum: localhost:2181 > high-availability.zookeeper.path.cluster-id: /my-namespace > {code} > - Start Flink, submit a job (any streaming example will do) > The job submission will time out. On the JobManager, it seems that the job > submission gets stuck when trying to submit something to Zookeeper. > In the JM UI, the job will sit there in status "CREATED" -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9371) High Availability JobManager Registration Failure
Jason Kania created FLINK-9371: -- Summary: High Availability JobManager Registration Failure Key: FLINK-9371 URL: https://issues.apache.org/jira/browse/FLINK-9371 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.4.2 Reporter: Jason Kania The following error is happening intermittently on an 3 node cluster with 2 Job Managers configured in HA mode. When this happens, the two JobManager instances are associated with one another. 2018-05-15 19:00:06,400 INFO org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager - Trying to associate with JobManager leader akka.tcp://flink@aaa-1:5/user/jobmanager 2018-05-15 19:00:06,404 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard message LeaderSessionMessage(0bbe70c4-2642-4a08-912f-6cc09646281f,RegisterResourceManager akka://flink/user/resourcemanager-d6567c5d-85f4-4b18-8eac-cf9725d076a5) because there is currently no valid leader id known. 2018-05-15 19:00:16,418 ERROR org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager - Resource manager could not register at JobManager akka.pattern.AskTimeoutException: Ask timed out on [ActorSelection[Anchor(akka://flink/), Path(/user/jobmanager)]] after [1 ms]. Sender[null] sent message of type "org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage". at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604) at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) at java.lang.Thread.run(Thread.java:748) Sometimes the following type of log also comes out following the previous log: 2018-05-15 19:13:47,525 WARN org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager - Discard message LeaderSessionMessage(5cab29b9-10d3-4b25-b934-f06b82be15b5,TriggerRegistrationAtJobManager akka.tcp://flink@aaa-1:5/user/jobmanager) because the expected leader session ID 61075587-51da-4e58-ac4f-9ea118ccdde9 did not equal the received leader session ID 5cab29b9-10d3-4b25-b934-f06b82be15b5. -- This message was sent by Atlassian JIRA (v7.6.3#76005)