[jira] [Updated] (FLINK-4114) Need a way to manage multiple named, long-lived jobs on a single YARN cluster in an automated manner
[ https://issues.apache.org/jira/browse/FLINK-4114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4114: -- Component/s: YARN Client Issue Type: Improvement (was: Bug) > Need a way to manage multiple named, long-lived jobs on a single YARN cluster > in an automated manner > > > Key: FLINK-4114 > URL: https://issues.apache.org/jira/browse/FLINK-4114 > Project: Flink > Issue Type: Improvement > Components: YARN Client >Reporter: Chris Hogue > > We are running several Flink jobs on a single YARN cluster. Currently each > Flink job is run in its own YARN session (and thus its own YARN application > ID). The difficulty comes in that we want to manage each of these jobs > individually by name. For example we want to start, stop, update one job > without affecting others. The primary access to these jobs is via the YARN > application ID, which is not meaningful to discern which flink job it is > running. > It would be nice if we had tools that would allow us to manage the flink jobs > by name and have it do the right thing with the YARN session. Today we can > use 'flink run' and have it start a YARN session for that job, but from that > point forward we have only the YARN application ID to work with. > As a concrete example suppose we have 2 jobs with names JobA and JobB. We'd > want a way to so something like: > flink run ; flink run > We'd then want to be able to call: > flink cancel JobA > The cancel command would spin down the YARN session for JobA in addition to > the flink job, leaving JobB running as normal. I've simplified the commands > leaving out other options for illustrative purposes. And we'll want to be > able to use savepoints through these steps as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4085) Set Kinesis Consumer Agent to Flink
[ https://issues.apache.org/jira/browse/FLINK-4085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger reassigned FLINK-4085: - Assignee: Robert Metzger > Set Kinesis Consumer Agent to Flink > --- > > Key: FLINK-4085 > URL: https://issues.apache.org/jira/browse/FLINK-4085 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: 1.1.0 > > > Currently, we use the default Kinesis Agent name. > I was asked by Amazon to set it to something containing Flink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4127) Clean up configuration and check breaking API changes
Robert Metzger created FLINK-4127: - Summary: Clean up configuration and check breaking API changes Key: FLINK-4127 URL: https://issues.apache.org/jira/browse/FLINK-4127 Project: Flink Issue Type: Improvement Reporter: Robert Metzger Fix For: 1.1.0 For the upcoming 1.1. release, I'll check if there are any breaking API changes and if the documentation is up tp date with the configuration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4127) Clean up configuration and check breaking API changes
[ https://issues.apache.org/jira/browse/FLINK-4127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4127: -- Attachment: flink-java.html flink-core.html > Clean up configuration and check breaking API changes > - > > Key: FLINK-4127 > URL: https://issues.apache.org/jira/browse/FLINK-4127 > Project: Flink > Issue Type: Improvement >Reporter: Robert Metzger > Fix For: 1.1.0 > > Attachments: flink-core.html, flink-java.html > > > For the upcoming 1.1. release, I'll check if there are any breaking API > changes and if the documentation is up tp date with the configuration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4127) Clean up configuration and check breaking API changes
[ https://issues.apache.org/jira/browse/FLINK-4127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4127: -- Attachment: flink-streaming-scala.html flink-streaming-java.html flink-scala.html I added the Japicmp reports for the covered modules > Clean up configuration and check breaking API changes > - > > Key: FLINK-4127 > URL: https://issues.apache.org/jira/browse/FLINK-4127 > Project: Flink > Issue Type: Improvement >Reporter: Robert Metzger > Fix For: 1.1.0 > > Attachments: flink-core.html, flink-java.html, flink-scala.html, > flink-streaming-java.html, flink-streaming-scala.html > > > For the upcoming 1.1. release, I'll check if there are any breaking API > changes and if the documentation is up tp date with the configuration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4114) Need a way to manage multiple named, long-lived jobs on a single YARN cluster in an automated manner
[ https://issues.apache.org/jira/browse/FLINK-4114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15353064#comment-15353064 ] Robert Metzger commented on FLINK-4114: --- Hi Chris, I'm happy hear that you've found a solution for now. I'm (and I think other Flink committers as well) are currently pretty busy preparing the 1.1 release. Afterwards, I'm planning to look into solutions for your feature request. > Need a way to manage multiple named, long-lived jobs on a single YARN cluster > in an automated manner > > > Key: FLINK-4114 > URL: https://issues.apache.org/jira/browse/FLINK-4114 > Project: Flink > Issue Type: Improvement > Components: YARN Client >Reporter: Chris Hogue > > We are running several Flink jobs on a single YARN cluster. Currently each > Flink job is run in its own YARN session (and thus its own YARN application > ID). The difficulty comes in that we want to manage each of these jobs > individually by name. For example we want to start, stop, update one job > without affecting others. The primary access to these jobs is via the YARN > application ID, which is not meaningful to discern which flink job it is > running. > It would be nice if we had tools that would allow us to manage the flink jobs > by name and have it do the right thing with the YARN session. Today we can > use 'flink run' and have it start a YARN session for that job, but from that > point forward we have only the YARN application ID to work with. > As a concrete example suppose we have 2 jobs with names JobA and JobB. We'd > want a way to so something like: > flink run ; flink run > We'd then want to be able to call: > flink cancel JobA > The cancel command would spin down the YARN session for JobA in addition to > the flink job, leaving JobB running as normal. I've simplified the commands > leaving out other options for illustrative purposes. And we'll want to be > able to use savepoints through these steps as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4127) Clean up configuration and check breaking API changes
[ https://issues.apache.org/jira/browse/FLINK-4127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger reassigned FLINK-4127: - Assignee: Robert Metzger > Clean up configuration and check breaking API changes > - > > Key: FLINK-4127 > URL: https://issues.apache.org/jira/browse/FLINK-4127 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: 1.1.0 > > Attachments: flink-core.html, flink-java.html, flink-scala.html, > flink-streaming-java.html, flink-streaming-scala.html > > > For the upcoming 1.1. release, I'll check if there are any breaking API > changes and if the documentation is up tp date with the configuration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4127) Clean up configuration and check breaking API changes
[ https://issues.apache.org/jira/browse/FLINK-4127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4127: -- Component/s: Build System > Clean up configuration and check breaking API changes > - > > Key: FLINK-4127 > URL: https://issues.apache.org/jira/browse/FLINK-4127 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: 1.1.0 > > Attachments: flink-core.html, flink-java.html, flink-scala.html, > flink-streaming-java.html, flink-streaming-scala.html > > > For the upcoming 1.1. release, I'll check if there are any breaking API > changes and if the documentation is up tp date with the configuration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4080) Kinesis consumer not exactly-once if stopped in the middle of processing aggregated records
[ https://issues.apache.org/jira/browse/FLINK-4080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-4080. --- Resolution: Fixed Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/fa42cdab > Kinesis consumer not exactly-once if stopped in the middle of processing > aggregated records > --- > > Key: FLINK-4080 > URL: https://issues.apache.org/jira/browse/FLINK-4080 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.1.0 > > > I've occasionally experienced unsuccessful ManualExactlyOnceTest after > several tries. > Kinesis records of the same aggregated batch will have the same sequence > number, and different sub-sequence numbers > (http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html). > The current code of the consumer is committing state every time it finishes > processing a record, even de-aggregated ones. This is a bug since this will > incorrectly mark all remaining records of the de-aggregated batch as > processed in the state. > Proposed fix: > 1. Use the extended `UserRecord` class in KCL to represent all records > (either non- or de-aggregated) instead of the basic `Record` class. This > gives access to whether or not the record was originally aggregated. > 2. The sequence number state we are checkpointing needs to be able to > indicate that the last seen sequence number of a shard may be a de-aggregated > shard, i.e., {"shard0" -> "5:8", "shard1" -> "2"} meaning the 8th sub-record > of the 5th record was last seen for shard 0. On restore, we start again from > record 5 for shard 0 and skip the first 7 sub-records; however, for shard 1 > we start from record 3 since record 2 is non-aggregated and already fully > processed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4033) Missing Scala example snippets for the Kinesis Connector documentation
[ https://issues.apache.org/jira/browse/FLINK-4033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-4033. --- Resolution: Fixed Merged in http://git-wip-us.apache.org/repos/asf/flink/commit/256c9c4d Thank you for all your work! > Missing Scala example snippets for the Kinesis Connector documentation > -- > > Key: FLINK-4033 > URL: https://issues.apache.org/jira/browse/FLINK-4033 > Project: Flink > Issue Type: Sub-task > Components: Documentation, Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Minor > Fix For: 1.1.0 > > > The documentation for the Kinesis connector is missing Scala version of the > example snippets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4085) Set Kinesis Consumer Agent to Flink
[ https://issues.apache.org/jira/browse/FLINK-4085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-4085. --- Resolution: Fixed Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/123be227 > Set Kinesis Consumer Agent to Flink > --- > > Key: FLINK-4085 > URL: https://issues.apache.org/jira/browse/FLINK-4085 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: 1.1.0 > > > Currently, we use the default Kinesis Agent name. > I was asked by Amazon to set it to something containing Flink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3294) KafkaConsumer (0.8) commit offsets using SimpleConsumer.commitOffsets()
[ https://issues.apache.org/jira/browse/FLINK-3294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15355119#comment-15355119 ] Robert Metzger commented on FLINK-3294: --- Hi Jens, My code is certainly outdated by now, because the the Kafka Consumer has been refactored in the meantime. Still, it shouldn't be too hard to add it to the current codebase again. Also, the code I've posted didn't work yet (I think I was unable to retrieve committed offsets from the broker). I think its probably easier to re-implement it from scratch, using my old code as a reference. How important is the feature for you? If you want to have it included into a release, we have to hurry up, because the Flink community is currently fixing the last bugs before the 1.1.0 release, and it'll soon be out (2-4 weeks). The next release will need ~3 months. If a snapshot version is okay for you, we can do it independent of the release. > KafkaConsumer (0.8) commit offsets using SimpleConsumer.commitOffsets() > --- > > Key: FLINK-3294 > URL: https://issues.apache.org/jira/browse/FLINK-3294 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Robert Metzger >Assignee: Robert Metzger > > Currently, the 0.8 consumer for Kafka is committing the offsets manually into > Zookeeper so that users can track the lag using external tools. > The 0.8 consumer has a pluggable design, and this component is easily > pluggable. > Since OffsetCommitRequest version=1 (supported in 0.8.2 or later), users can > choose between two offset commit modes: > a) Let the broker commit into ZK (this is what we are doing from the consumer > b) Let the broker commit the offset into a special topic. > By adding a different "OffsetHandler" backend, users can commit offsets from > the brokers (reducing the total number of ZK connections) or into the > broker's offset topic. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4133) Reflect streaming file source changes in documentation
Robert Metzger created FLINK-4133: - Summary: Reflect streaming file source changes in documentation Key: FLINK-4133 URL: https://issues.apache.org/jira/browse/FLINK-4133 Project: Flink Issue Type: Bug Components: DataStream API, Documentation Reporter: Robert Metzger In FLINK-2314 the file sources for the DataStream API were reworked. The documentation doesn't explain the (new?) semantics of the file sources. In which order are files read? How are file modifications treated? (appends, in place modifications?) I suspect this table is also not up-to-date: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/fault_tolerance.html#fault-tolerance-guarantees-of-data-sources-and-sinks -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4137) JobManager web frontend does not shut down on OOM exception on JM
Robert Metzger created FLINK-4137: - Summary: JobManager web frontend does not shut down on OOM exception on JM Key: FLINK-4137 URL: https://issues.apache.org/jira/browse/FLINK-4137 Project: Flink Issue Type: Bug Components: Distributed Coordination, JobManager, Webfrontend Reporter: Robert Metzger Priority: Critical After the following Exception on the JobManager. {code} 2016-06-30 14:45:06,642 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 379 (in 7017 ms) 2016-06-30 14:45:06,642 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 380 @ 1467297906642 2016-06-30 14:45:17,902 ERROR akka.actor.ActorSystemImpl - Uncaught fatal error from thread [flink-akka.remote.default-remote-dispatcher-6] shutting down ActorSystem [flink] java.lang.OutOfMemoryError: Java heap space at com.google.protobuf.ByteString.copyFrom(ByteString.java:192) at com.google.protobuf.CodedInputStream.readBytes(CodedInputStream.java:324) at akka.remote.WireFormats$SerializedMessage.(WireFormats.java:3030) at akka.remote.WireFormats$SerializedMessage.(WireFormats.java:2980) at akka.remote.WireFormats$SerializedMessage$1.parsePartialFrom(WireFormats.java:3073) at akka.remote.WireFormats$SerializedMessage$1.parsePartialFrom(WireFormats.java:3068) at com.google.protobuf.CodedInputStream.readMessage(CodedInputStream.java:309) at akka.remote.WireFormats$RemoteEnvelope.(WireFormats.java:993) at akka.remote.WireFormats$RemoteEnvelope.(WireFormats.java:927) at akka.remote.WireFormats$RemoteEnvelope$1.parsePartialFrom(WireFormats.java:1049) at akka.remote.WireFormats$RemoteEnvelope$1.parsePartialFrom(WireFormats.java:1044) at com.google.protobuf.CodedInputStream.readMessage(CodedInputStream.java:309) at akka.remote.WireFormats$AckAndEnvelopeContainer.(WireFormats.java:241) at akka.remote.WireFormats$AckAndEnvelopeContainer.(WireFormats.java:175) at akka.remote.WireFormats$AckAndEnvelopeContainer$1.parsePartialFrom(WireFormats.java:279) at akka.remote.WireFormats$AckAndEnvelopeContainer$1.parsePartialFrom(WireFormats.java:274) at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:141) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:176) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:188) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:193) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49) at akka.remote.WireFormats$AckAndEnvelopeContainer.parseFrom(WireFormats.java:409) at akka.remote.transport.AkkaPduProtobufCodec$.decodeMessage(AkkaPduCodec.scala:181) at akka.remote.EndpointReader.akka$remote$EndpointReader$$tryDecodeMessageAndAck(Endpoint.scala:995) at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:928) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) 2016-06-30 14:45:18,502 INFO org.apache.flink.yarn.YarnJobManager - Stopping JobManager akka.tcp://flink@172.31.23.121:45569/user/jobmanager. 2016-06-30 14:45:18,533 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom File Source (1/1) (5f2a1062c796ec6098a0a88227b9eab4) switched from RUNNING to CANCELING {code} The JobManager JVM keeps running (keeping the YARN session alive) because the web monitor is not stopped on such errors. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3294) KafkaConsumer (0.8) commit offsets using SimpleConsumer.commitOffsets()
[ https://issues.apache.org/jira/browse/FLINK-3294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15358546#comment-15358546 ] Robert Metzger commented on FLINK-3294: --- Hi Jens, the Kafka 0.9 consumer doesn't commit the offsets to Zookeeper. They are committed to the Kafka broker. Only Flink's Kafka 0.8 consumer commits to ZK. That's why the commit code is different in the two implementations. The Kafka 0.9 consumer also should not have any ZK dependencies. What you have to do is to introduce an interface "OffsetHandler" into the 0.8 fetcher that has two implementations: A Zookeeper Offset handler (the current code) and a Kafka offset handler (new code). In the diff of the branch I posed above, you can see this structure already: https://github.com/apache/flink/compare/master...rmetzger:yangjun_fix > KafkaConsumer (0.8) commit offsets using SimpleConsumer.commitOffsets() > --- > > Key: FLINK-3294 > URL: https://issues.apache.org/jira/browse/FLINK-3294 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Robert Metzger >Assignee: Robert Metzger > > Currently, the 0.8 consumer for Kafka is committing the offsets manually into > Zookeeper so that users can track the lag using external tools. > The 0.8 consumer has a pluggable design, and this component is easily > pluggable. > Since OffsetCommitRequest version=1 (supported in 0.8.2 or later), users can > choose between two offset commit modes: > a) Let the broker commit into ZK (this is what we are doing from the consumer > b) Let the broker commit the offset into a special topic. > By adding a different "OffsetHandler" backend, users can commit offsets from > the brokers (reducing the total number of ZK connections) or into the > broker's offset topic. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages
[ https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-4027. --- Resolution: Fixed Fix Version/s: 1.1.0 Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/7206b0ed > FlinkKafkaProducer09 sink can lose messages > --- > > Key: FLINK-4027 > URL: https://issues.apache.org/jira/browse/FLINK-4027 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Elias Levy >Assignee: Robert Metzger >Priority: Critical > Fix For: 1.1.0 > > > The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees. > The producer is publishing messages asynchronously. A callback can record > publishing errors, which will be raised when detected. But as far as I can > tell, there is no barrier to wait for async errors from the sink when > checkpointing or to track the event time of acked messages to inform the > checkpointing process. > If a checkpoint occurs while there are pending publish requests, and the > requests return a failure after the checkpoint occurred, those message will > be lost as the checkpoint will consider them processed by the sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4146) CliFrontendYarnAddressConfigurationTest picks wrong IP address on Travis
[ https://issues.apache.org/jira/browse/FLINK-4146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger reassigned FLINK-4146: - Assignee: Robert Metzger > CliFrontendYarnAddressConfigurationTest picks wrong IP address on Travis > > > Key: FLINK-4146 > URL: https://issues.apache.org/jira/browse/FLINK-4146 > Project: Flink > Issue Type: Bug > Components: YARN Client >Reporter: Ufuk Celebi >Assignee: Robert Metzger > Fix For: 1.1.0 > > > {code} > > CliFrontendYarnAddressConfigurationTest.testManualOptionsOverridesYarn:274->checkJobManagerAddress:424 > expected:<[ip-10-221-130-22.ec2.internal]> but was:<[10.221.130.22]> > {code} > https://s3.amazonaws.com/archive.travis-ci.org/jobs/142007244/log.txt > https://s3.amazonaws.com/archive.travis-ci.org/jobs/142007245/log.txt > https://s3.amazonaws.com/archive.travis-ci.org/jobs/142007246/log.txt > This is a build from a personal branch, but should also happen on master. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4146) CliFrontendYarnAddressConfigurationTest picks wrong IP address on Travis
[ https://issues.apache.org/jira/browse/FLINK-4146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4146: -- Labels: test-stability (was: ) > CliFrontendYarnAddressConfigurationTest picks wrong IP address on Travis > > > Key: FLINK-4146 > URL: https://issues.apache.org/jira/browse/FLINK-4146 > Project: Flink > Issue Type: Bug > Components: YARN Client >Reporter: Ufuk Celebi >Assignee: Robert Metzger > Labels: test-stability > Fix For: 1.1.0 > > > {code} > > CliFrontendYarnAddressConfigurationTest.testManualOptionsOverridesYarn:274->checkJobManagerAddress:424 > expected:<[ip-10-221-130-22.ec2.internal]> but was:<[10.221.130.22]> > {code} > https://s3.amazonaws.com/archive.travis-ci.org/jobs/142007244/log.txt > https://s3.amazonaws.com/archive.travis-ci.org/jobs/142007245/log.txt > https://s3.amazonaws.com/archive.travis-ci.org/jobs/142007246/log.txt > This is a build from a personal branch, but should also happen on master. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4147) Consider moving the file sources from the StreamExecutionEnvironment to the flink-connector-filesystem
Robert Metzger created FLINK-4147: - Summary: Consider moving the file sources from the StreamExecutionEnvironment to the flink-connector-filesystem Key: FLINK-4147 URL: https://issues.apache.org/jira/browse/FLINK-4147 Project: Flink Issue Type: Sub-task Components: DataStream API Reporter: Robert Metzger Fix For: 2.0.0 The StreamExecutionEnvironment has many convenience methods for reading static files. However, it seems that this makes it hard to maintain Flink's APIs and that not so many users are using the file sources. The filesystem connector is the logical location for persistent file reading. I suggest to move those sources there. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3397) Failed streaming jobs should fall back to the most recent checkpoint/savepoint
[ https://issues.apache.org/jira/browse/FLINK-3397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-3397: -- Component/s: State Backends, Checkpointing > Failed streaming jobs should fall back to the most recent checkpoint/savepoint > -- > > Key: FLINK-3397 > URL: https://issues.apache.org/jira/browse/FLINK-3397 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing, Streaming >Affects Versions: 1.0.0 >Reporter: Gyula Fora >Priority: Minor > > The current fallback behaviour in case of a streaming job failure is slightly > counterintuitive: > If a job fails it will fall back to the most recent checkpoint (if any) even > if there were more recent savepoint taken. This means that savepoints are not > regarded as checkpoints by the system only points from where a job can be > manually restarted. > I suggest to change this so that savepoints are also regarded as checkpoints > in case of a failure and they will also be used to automatically restore the > streaming job. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4151) Address Travis CI build time: We are exceeding the 2 hours limit
Robert Metzger created FLINK-4151: - Summary: Address Travis CI build time: We are exceeding the 2 hours limit Key: FLINK-4151 URL: https://issues.apache.org/jira/browse/FLINK-4151 Project: Flink Issue Type: Task Components: Build System Reporter: Robert Metzger We've recently started hitting the two hours limit for Travis CI. I'll look into some approaches to get our build stable again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4151) Address Travis CI build time: We are exceeding the 2 hours limit
[ https://issues.apache.org/jira/browse/FLINK-4151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4151: -- Attachment: Pasted image at 2016_07_04 16_31.png > Address Travis CI build time: We are exceeding the 2 hours limit > > > Key: FLINK-4151 > URL: https://issues.apache.org/jira/browse/FLINK-4151 > Project: Flink > Issue Type: Task > Components: Build System >Reporter: Robert Metzger > Attachments: Pasted image at 2016_07_04 16_31.png > > > We've recently started hitting the two hours limit for Travis CI. > I'll look into some approaches to get our build stable again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4151) Address Travis CI build time: We are exceeding the 2 hours limit
[ https://issues.apache.org/jira/browse/FLINK-4151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger reassigned FLINK-4151: - Assignee: Robert Metzger > Address Travis CI build time: We are exceeding the 2 hours limit > > > Key: FLINK-4151 > URL: https://issues.apache.org/jira/browse/FLINK-4151 > Project: Flink > Issue Type: Task > Components: Build System >Reporter: Robert Metzger >Assignee: Robert Metzger > Attachments: Pasted image at 2016_07_04 16_31.png > > > We've recently started hitting the two hours limit for Travis CI. > I'll look into some approaches to get our build stable again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4151) Address Travis CI build time: We are exceeding the 2 hours limit
[ https://issues.apache.org/jira/browse/FLINK-4151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15362211#comment-15362211 ] Robert Metzger commented on FLINK-4151: --- I attached a screenshot of the Top 20 most time consuming tests in Flink. I don't think its a good long term solution to start thinking about making the tests faster. Even if we manage to speed up the tests by 10 minutes, we'll have the same problem again in a few weeks. I tried splitting our tests into two groups: All tests starting with the letters A-M are in one group, tests from N-Z are in the second group. This means that we have 10 tests on travis (with 5 tests running concurrently). The total time for the tests to finish is ~2.5 hours right now, however, you'll get an indication if everything builds within an hour. This is how the build matrix will look like: https://travis-ci.org/rmetzger/flink/builds/142378660 > Address Travis CI build time: We are exceeding the 2 hours limit > > > Key: FLINK-4151 > URL: https://issues.apache.org/jira/browse/FLINK-4151 > Project: Flink > Issue Type: Task > Components: Build System >Reporter: Robert Metzger >Assignee: Robert Metzger > Attachments: Pasted image at 2016_07_04 16_31.png > > > We've recently started hitting the two hours limit for Travis CI. > I'll look into some approaches to get our build stable again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4151) Address Travis CI build time: We are exceeding the 2 hours limit
[ https://issues.apache.org/jira/browse/FLINK-4151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15362290#comment-15362290 ] Robert Metzger commented on FLINK-4151: --- Build #2, executing all tests needs 2 hours. Build #2 + #7 (executing the split tests) needs 1:15 and 1:14, so its a 30 minutes overhead. > Address Travis CI build time: We are exceeding the 2 hours limit > > > Key: FLINK-4151 > URL: https://issues.apache.org/jira/browse/FLINK-4151 > Project: Flink > Issue Type: Task > Components: Build System >Reporter: Robert Metzger >Assignee: Robert Metzger > Attachments: Pasted image at 2016_07_04 16_31.png > > > We've recently started hitting the two hours limit for Travis CI. > I'll look into some approaches to get our build stable again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4151) Address Travis CI build time: We are exceeding the 2 hours limit
[ https://issues.apache.org/jira/browse/FLINK-4151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15362295#comment-15362295 ] Robert Metzger commented on FLINK-4151: --- I agree. This way, we will see failing tests earlier. > Address Travis CI build time: We are exceeding the 2 hours limit > > > Key: FLINK-4151 > URL: https://issues.apache.org/jira/browse/FLINK-4151 > Project: Flink > Issue Type: Task > Components: Build System >Reporter: Robert Metzger >Assignee: Robert Metzger > Attachments: Pasted image at 2016_07_04 16_31.png > > > We've recently started hitting the two hours limit for Travis CI. > I'll look into some approaches to get our build stable again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4152) TaskManager registration exponential backoff doesn't work
Robert Metzger created FLINK-4152: - Summary: TaskManager registration exponential backoff doesn't work Key: FLINK-4152 URL: https://issues.apache.org/jira/browse/FLINK-4152 Project: Flink Issue Type: Task Components: Distributed Coordination, TaskManager, YARN Client Reporter: Robert Metzger While testing Flink 1.1 I've found that the TaskManagers are logging many messages when registering at the JobManager. This is the log file: https://gist.github.com/rmetzger/0cebe0419cdef4507b1e8a42e33ef294 Its logging more than 3000 messages in less than a minute. I don't think that this is the expected behavior. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4152) TaskManager registration exponential backoff doesn't work
[ https://issues.apache.org/jira/browse/FLINK-4152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4152: -- Issue Type: Bug (was: Task) > TaskManager registration exponential backoff doesn't work > - > > Key: FLINK-4152 > URL: https://issues.apache.org/jira/browse/FLINK-4152 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, TaskManager, YARN Client >Reporter: Robert Metzger > > While testing Flink 1.1 I've found that the TaskManagers are logging many > messages when registering at the JobManager. > This is the log file: > https://gist.github.com/rmetzger/0cebe0419cdef4507b1e8a42e33ef294 > Its logging more than 3000 messages in less than a minute. I don't think that > this is the expected behavior. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4152) TaskManager registration exponential backoff doesn't work
[ https://issues.apache.org/jira/browse/FLINK-4152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15362433#comment-15362433 ] Robert Metzger commented on FLINK-4152: --- The full aggregated YARN log is more than 300 MB ... I'm not sure what the root cause of this issue is. Somewhere there seems to be a retry mechanism not using any backoff. In the JobManager, there are many messages like this {code} 2016-07-05 09:57:10,032 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Requesting new TaskManager container with 1024 megabytes memory. Pending requests: 1 2016-07-05 09:57:10,033 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Requesting new TaskManager container with 1024 megabytes memory. Pending requests: 2 2016-07-05 09:57:10,033 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Requesting new TaskManager container with 1024 megabytes memory. Pending requests: 3 2016-07-05 09:57:10,033 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Requesting new TaskManager container with 1024 megabytes memory. Pending requests: 4 2016-07-05 09:57:10,054 WARN org.apache.flink.yarn.YarnFlinkResourceManager - TaskManager resource registration failed for ResourceID{resourceId='container_1467643389376_0026_01_03'} java.lang.Exception: Cannot register Worker - unknown resource id ResourceID{resourceId='container_1467643389376_0026_01_03'} at org.apache.flink.yarn.YarnFlinkResourceManager.workerRegistered(YarnFlinkResourceManager.java:329) at org.apache.flink.yarn.YarnFlinkResourceManager.workerRegistered(YarnFlinkResourceManager.java:66) at org.apache.flink.runtime.clusterframework.FlinkResourceManager.handleRegisterResource(FlinkResourceManager.java:362) at org.apache.flink.runtime.clusterframework.FlinkResourceManager.handleMessage(FlinkResourceManager.java:235) at org.apache.flink.yarn.YarnFlinkResourceManager.handleMessage(YarnFlinkResourceManager.java:169) at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:90) at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:70) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2016-07-05 09:57:10,058 WARN org.apache.flink.yarn.YarnFlinkResourceManager - TaskManager resource registration failed for ResourceID{resourceId='container_1467643389376_0026_01_04'} java.lang.Exception: Cannot register Worker - unknown resource id ResourceID{resourceId='container_1467643389376_0026_01_04'} at org.apache.flink.yarn.YarnFlinkResourceManager.workerRegistered(YarnFlinkResourceManager.java:329) at org.apache.flink.yarn.YarnFlinkResourceManager.workerRegistered(YarnFlinkResourceManager.java:66) at org.apache.flink.runtime.clusterframework.FlinkResourceManager.handleRegisterResource(FlinkResourceManager.java:362) at org.apache.flink.runtime.clusterframework.FlinkResourceManager.handleMessage(FlinkResourceManager.java:235) at org.apache.flink.yarn.YarnFlinkResourceManager.handleMessage(YarnFlinkResourceManager.java:169) at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:90) at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:70) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.conc
[jira] [Updated] (FLINK-4152) TaskManager registration exponential backoff doesn't work
[ https://issues.apache.org/jira/browse/FLINK-4152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4152: -- Attachment: logs.tgz > TaskManager registration exponential backoff doesn't work > - > > Key: FLINK-4152 > URL: https://issues.apache.org/jira/browse/FLINK-4152 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, TaskManager, YARN Client >Reporter: Robert Metzger > Attachments: logs.tgz > > > While testing Flink 1.1 I've found that the TaskManagers are logging many > messages when registering at the JobManager. > This is the log file: > https://gist.github.com/rmetzger/0cebe0419cdef4507b1e8a42e33ef294 > Its logging more than 3000 messages in less than a minute. I don't think that > this is the expected behavior. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-3995) Properly Structure Test Utils and Dependencies
[ https://issues.apache.org/jira/browse/FLINK-3995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-3995. --- Resolution: Fixed Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/8a2c03e1 > Properly Structure Test Utils and Dependencies > -- > > Key: FLINK-3995 > URL: https://issues.apache.org/jira/browse/FLINK-3995 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.0.2 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.1.0 > > > All valuable test utils are only found in {{test-jars}}, but should be found > in the {{compile}} scope of the test util projects. > - TestLogger > - RetryRules > - MiniClusters > - TestEnvironments > - ... > Additionally, we have dependencies where the {{compile}} scope of some > projects depends on {{test-jars}} of other projects. That can create problems > in some builds and with some tools. > Here is how we can fix that: > - Create a {{flink-testutils-core}} project, which has the test utils > currently contained in the {{flink-core}} {{test-jar}} in the main scope. > That means the {{flink-core test-jar}} is not needed by other projects any > more. > - Make the Mini Cluster available in {{flink-test-utils}} main scope. > - To remove the test-jar dependency on {{flink-runtime}} from the > {{flink-test-utils}} project, we need to move the test actor classes to the > main scope in {{flink-runtime}}. > This is related to FLINK-1827 (a followup). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4133) Reflect streaming file source changes in documentation
[ https://issues.apache.org/jira/browse/FLINK-4133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-4133. --- Resolution: Fixed Fix Version/s: 1.1.0 Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/0c809285 > Reflect streaming file source changes in documentation > -- > > Key: FLINK-4133 > URL: https://issues.apache.org/jira/browse/FLINK-4133 > Project: Flink > Issue Type: Bug > Components: DataStream API, Documentation >Reporter: Robert Metzger >Assignee: Kostas Kloudas > Fix For: 1.1.0 > > > In FLINK-2314 the file sources for the DataStream API were reworked. > The documentation doesn't explain the (new?) semantics of the file sources. > In which order are files read? > How are file modifications treated? (appends, in place modifications?) > I suspect this table is also not up-to-date: > https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/fault_tolerance.html#fault-tolerance-guarantees-of-data-sources-and-sinks > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4146) CliFrontendYarnAddressConfigurationTest picks wrong IP address on Travis
[ https://issues.apache.org/jira/browse/FLINK-4146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-4146. --- Resolution: Fixed Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/424c0826 > CliFrontendYarnAddressConfigurationTest picks wrong IP address on Travis > > > Key: FLINK-4146 > URL: https://issues.apache.org/jira/browse/FLINK-4146 > Project: Flink > Issue Type: Bug > Components: YARN Client >Reporter: Ufuk Celebi >Assignee: Robert Metzger > Labels: test-stability > Fix For: 1.1.0 > > > {code} > > CliFrontendYarnAddressConfigurationTest.testManualOptionsOverridesYarn:274->checkJobManagerAddress:424 > expected:<[ip-10-221-130-22.ec2.internal]> but was:<[10.221.130.22]> > {code} > https://s3.amazonaws.com/archive.travis-ci.org/jobs/142007244/log.txt > https://s3.amazonaws.com/archive.travis-ci.org/jobs/142007245/log.txt > https://s3.amazonaws.com/archive.travis-ci.org/jobs/142007246/log.txt > This is a build from a personal branch, but should also happen on master. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4151) Address Travis CI build time: We are exceeding the 2 hours limit
[ https://issues.apache.org/jira/browse/FLINK-4151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15362683#comment-15362683 ] Robert Metzger commented on FLINK-4151: --- Regarding our 10 slots, we could ask Apache to increase our limit a bit, if this is getting a problem. I'm using travis with my own GitHub account for my work, so I don't care too much for the apache/flink account. Since it usually takes around 24 hours until we start looking at pull request, there is plenty of time for travis to run the tests. There are tools to cancel old builds of pull requests: https://github.com/grosser/travis_dedup. The problem is that Travis doesn't have access tokens for individual repositories, only for the entire travis account. I don't think that Apache Infra will give us the access token for the Apache travis account. But maybe we can talk to infra and see if they can offer this as a service for all ASF Travis users? I think we've paralleled the builds as much as possible (using concurrent test execution in maven). > Address Travis CI build time: We are exceeding the 2 hours limit > > > Key: FLINK-4151 > URL: https://issues.apache.org/jira/browse/FLINK-4151 > Project: Flink > Issue Type: Task > Components: Build System >Reporter: Robert Metzger >Assignee: Robert Metzger > Attachments: Pasted image at 2016_07_04 16_31.png > > > We've recently started hitting the two hours limit for Travis CI. > I'll look into some approaches to get our build stable again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4137) JobManager web frontend does not shut down on OOM exception on JM
[ https://issues.apache.org/jira/browse/FLINK-4137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15362783#comment-15362783 ] Robert Metzger commented on FLINK-4137: --- Here's the full log: https://gist.github.com/rmetzger/5c609f1d572e6e6209def274ab7740ae > JobManager web frontend does not shut down on OOM exception on JM > - > > Key: FLINK-4137 > URL: https://issues.apache.org/jira/browse/FLINK-4137 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, JobManager, Webfrontend >Reporter: Robert Metzger >Assignee: Till Rohrmann >Priority: Critical > > After the following Exception on the JobManager. > {code} > 2016-06-30 14:45:06,642 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed > checkpoint 379 (in 7017 ms) > 2016-06-30 14:45:06,642 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 380 @ 1467297906642 > 2016-06-30 14:45:17,902 ERROR akka.actor.ActorSystemImpl > - Uncaught fatal error from thread > [flink-akka.remote.default-remote-dispatcher-6] shutting down ActorSystem > [flink] > java.lang.OutOfMemoryError: Java heap space > at com.google.protobuf.ByteString.copyFrom(ByteString.java:192) > at > com.google.protobuf.CodedInputStream.readBytes(CodedInputStream.java:324) > at > akka.remote.WireFormats$SerializedMessage.(WireFormats.java:3030) > at > akka.remote.WireFormats$SerializedMessage.(WireFormats.java:2980) > at > akka.remote.WireFormats$SerializedMessage$1.parsePartialFrom(WireFormats.java:3073) > at > akka.remote.WireFormats$SerializedMessage$1.parsePartialFrom(WireFormats.java:3068) > at > com.google.protobuf.CodedInputStream.readMessage(CodedInputStream.java:309) > at akka.remote.WireFormats$RemoteEnvelope.(WireFormats.java:993) > at akka.remote.WireFormats$RemoteEnvelope.(WireFormats.java:927) > at > akka.remote.WireFormats$RemoteEnvelope$1.parsePartialFrom(WireFormats.java:1049) > at > akka.remote.WireFormats$RemoteEnvelope$1.parsePartialFrom(WireFormats.java:1044) > at > com.google.protobuf.CodedInputStream.readMessage(CodedInputStream.java:309) > at > akka.remote.WireFormats$AckAndEnvelopeContainer.(WireFormats.java:241) > at > akka.remote.WireFormats$AckAndEnvelopeContainer.(WireFormats.java:175) > at > akka.remote.WireFormats$AckAndEnvelopeContainer$1.parsePartialFrom(WireFormats.java:279) > at > akka.remote.WireFormats$AckAndEnvelopeContainer$1.parsePartialFrom(WireFormats.java:274) > at > com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:141) > at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:176) > at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:188) > at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:193) > at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49) > at > akka.remote.WireFormats$AckAndEnvelopeContainer.parseFrom(WireFormats.java:409) > at > akka.remote.transport.AkkaPduProtobufCodec$.decodeMessage(AkkaPduCodec.scala:181) > at > akka.remote.EndpointReader.akka$remote$EndpointReader$$tryDecodeMessageAndAck(Endpoint.scala:995) > at > akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:928) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > 2016-06-30 14:45:18,502 INFO org.apache.flink.yarn.YarnJobManager > - Stopping JobManager > akka.tcp://flink@172.31.23.121:45569/user/jobmanager. > 2016-06-30 14:45:18,533 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: > Custom File Source (1/1) (5f2a1062c796ec6098a0a88227b9eab4) switched from > RUNNING to CANCELING > {code} > The JobManager JVM keeps running (keeping the YARN session alive) because the > web monitor is not stopped on such errors. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4137) JobManager web frontend does not shut down on OOM exception on JM
[ https://issues.apache.org/jira/browse/FLINK-4137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15362824#comment-15362824 ] Robert Metzger commented on FLINK-4137: --- THank you for looking into the problem. I will try to reproduce the issue tomorrow and check if the configuration setting fixes the issue. > JobManager web frontend does not shut down on OOM exception on JM > - > > Key: FLINK-4137 > URL: https://issues.apache.org/jira/browse/FLINK-4137 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, JobManager, Webfrontend >Reporter: Robert Metzger >Assignee: Till Rohrmann >Priority: Critical > > After the following Exception on the JobManager. > {code} > 2016-06-30 14:45:06,642 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed > checkpoint 379 (in 7017 ms) > 2016-06-30 14:45:06,642 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 380 @ 1467297906642 > 2016-06-30 14:45:17,902 ERROR akka.actor.ActorSystemImpl > - Uncaught fatal error from thread > [flink-akka.remote.default-remote-dispatcher-6] shutting down ActorSystem > [flink] > java.lang.OutOfMemoryError: Java heap space > at com.google.protobuf.ByteString.copyFrom(ByteString.java:192) > at > com.google.protobuf.CodedInputStream.readBytes(CodedInputStream.java:324) > at > akka.remote.WireFormats$SerializedMessage.(WireFormats.java:3030) > at > akka.remote.WireFormats$SerializedMessage.(WireFormats.java:2980) > at > akka.remote.WireFormats$SerializedMessage$1.parsePartialFrom(WireFormats.java:3073) > at > akka.remote.WireFormats$SerializedMessage$1.parsePartialFrom(WireFormats.java:3068) > at > com.google.protobuf.CodedInputStream.readMessage(CodedInputStream.java:309) > at akka.remote.WireFormats$RemoteEnvelope.(WireFormats.java:993) > at akka.remote.WireFormats$RemoteEnvelope.(WireFormats.java:927) > at > akka.remote.WireFormats$RemoteEnvelope$1.parsePartialFrom(WireFormats.java:1049) > at > akka.remote.WireFormats$RemoteEnvelope$1.parsePartialFrom(WireFormats.java:1044) > at > com.google.protobuf.CodedInputStream.readMessage(CodedInputStream.java:309) > at > akka.remote.WireFormats$AckAndEnvelopeContainer.(WireFormats.java:241) > at > akka.remote.WireFormats$AckAndEnvelopeContainer.(WireFormats.java:175) > at > akka.remote.WireFormats$AckAndEnvelopeContainer$1.parsePartialFrom(WireFormats.java:279) > at > akka.remote.WireFormats$AckAndEnvelopeContainer$1.parsePartialFrom(WireFormats.java:274) > at > com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:141) > at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:176) > at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:188) > at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:193) > at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49) > at > akka.remote.WireFormats$AckAndEnvelopeContainer.parseFrom(WireFormats.java:409) > at > akka.remote.transport.AkkaPduProtobufCodec$.decodeMessage(AkkaPduCodec.scala:181) > at > akka.remote.EndpointReader.akka$remote$EndpointReader$$tryDecodeMessageAndAck(Endpoint.scala:995) > at > akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:928) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > 2016-06-30 14:45:18,502 INFO org.apache.flink.yarn.YarnJobManager > - Stopping JobManager > akka.tcp://flink@172.31.23.121:45569/user/jobmanager. > 2016-06-30 14:45:18,533 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: > Custom File Source (1/1) (5f2a1062c796ec6098a0a88227b9eab4) switched from > RUNNING to CANCELING > {code} > The JobManager JVM keeps running (keeping the YARN session alive) because the > web monitor is not stopped on such errors. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer
[ https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger closed FLINK-3231. - Resolution: Fixed Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/17dfd68d Thanks a lot for your contribution [~tzulitai] > Handle Kinesis-side resharding in Kinesis streaming consumer > > > Key: FLINK-3231 > URL: https://issues.apache.org/jira/browse/FLINK-3231 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > A big difference between Kinesis shards and Kafka partitions is that Kinesis > users can choose to "merge" and "split" shards at any time for adjustable > stream throughput capacity. This article explains this quite clearly: > https://brandur.org/kinesis-by-example. > This will break the static shard-to-task mapping implemented in the basic > version of the Kinesis consumer > (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task > mapping is done in a simple round-robin-like distribution which can be > locally determined at each Flink consumer task (Flink Kafka consumer does > this too). > To handle Kinesis resharding, we will need some way to let the Flink consumer > tasks coordinate which shards they are currently handling, and allow the > tasks to ask the coordinator for a shards reassignment when the task finds > out it has found a closed shard at runtime (shards will be closed by Kinesis > when it is merged and split). > We need a centralized coordinator state store which is visible to all Flink > consumer tasks. Tasks can use this state store to locally determine what > shards it can be reassigned. Amazon KCL uses a DynamoDB table for the > coordination, but as described in > https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use > KCL for the implementation of the consumer if we want to leverage Flink's > checkpointing mechanics. For our own implementation, Zookeeper can be used > for this state store, but that means it would require the user to set up ZK > to work. > Since this feature introduces extensive work, it is opened as a separate > sub-task from the basic implementation > https://issues.apache.org/jira/browse/FLINK-3229. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4020) Remove shard list querying from Kinesis consumer constructor
[ https://issues.apache.org/jira/browse/FLINK-4020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-4020. --- Resolution: Fixed Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/17dfd68d > Remove shard list querying from Kinesis consumer constructor > > > Key: FLINK-4020 > URL: https://issues.apache.org/jira/browse/FLINK-4020 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > Currently FlinkKinesisConsumer is querying for the whole list of shards in > the constructor, forcing the client to be able to access Kinesis as well. > This is also a drawback for handling Kinesis-side resharding, since we'd want > all shard listing / shard-to-task assigning / shard end (result of > resharding) handling logic to be capable of being independently done within > task life cycle methods, with defined and definite results. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4155) Get Kafka producer partition info in open method instead of constructor
[ https://issues.apache.org/jira/browse/FLINK-4155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15363959#comment-15363959 ] Robert Metzger commented on FLINK-4155: --- I agree that we should change this. FLINK-4023 is related. > Get Kafka producer partition info in open method instead of constructor > --- > > Key: FLINK-4155 > URL: https://issues.apache.org/jira/browse/FLINK-4155 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.0, 1.0.3 >Reporter: Gyula Fora > > Currently the Flink Kafka producer does not really do any error handling if > something is wrong with the partition metadata as it is serialized with the > user function. > This means that in some cases the job can go into an error loop when using > the checkpoints. Getting the partition info in the open method would solve > this problem (like restarting from a savepoint which re-runs the constructor). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4157) FlinkKafkaMetrics cause TaskManager shutdown during cancellation
Robert Metzger created FLINK-4157: - Summary: FlinkKafkaMetrics cause TaskManager shutdown during cancellation Key: FLINK-4157 URL: https://issues.apache.org/jira/browse/FLINK-4157 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.0.3 Reporter: Robert Metzger Assignee: Robert Metzger Priority: Critical Fix For: 1.1.0 The following issue was reported by a user: {code} 2016-07-05 01:32:25,113 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Sink: KafkaOutput (59/72) 2016-07-05 01:32:25,113 INFO org.apache.flink.runtime.taskmanager.Task - Sink: KafkaOutput (53/72) switched to CANCELED 2016-07-05 01:32:25,113 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Sink: KafkaOutput (53/72) 2016-07-05 01:32:25,144 ERROR akka.actor.OneForOneStrategy - java.util.ConcurrentModificationException at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429) at java.util.HashMap$ValueIterator.next(HashMap.java:1458) at org.apache.kafka.clients.InFlightRequests.inFlightRequestCount(InFlightRequests.java:106) at org.apache.kafka.clients.NetworkClient.inFlightRequestCount(NetworkClient.java:211) at org.apache.kafka.clients.producer.internals.Sender$SenderMetrics$1.measure(Sender.java:383) at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:57) at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) at org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator.writeObject(DefaultKafkaMetricAccumulator.java:152) at sun.reflect.GeneratedMethodAccessor20859.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at java.util.HashMap.internalWriteEntries(HashMap.java:1777) at java.util.HashMap.writeObject(HashMap.java:1354) at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) at java.util.Collections$SynchronizedMap.writeObject(Collections.java:2691) at sun.reflect.GeneratedMethodAccessor226.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300) at org.apache.flink.util.SerializedValue.(SerializedValue.java:48) at org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58) at org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:78) at org.apache.flink.runtime.taskmanager.TaskManager.unregisterTaskAndNotifyFinalState(TaskManager.scala:1150) at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:407) at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:265) at scala.runtime.AbstractPartialFunction$mcVL$sp.
[jira] [Created] (FLINK-4160) YARN session doesn't show input validation errors
Robert Metzger created FLINK-4160: - Summary: YARN session doesn't show input validation errors Key: FLINK-4160 URL: https://issues.apache.org/jira/browse/FLINK-4160 Project: Flink Issue Type: Bug Components: YARN Client Affects Versions: 1.1.0 Reporter: Robert Metzger Priority: Critical Setting a jobmanager size below 768 mb causes this error: {code} ~/flink/build-target$ ./bin/yarn-session.sh -n 5 -s 4 -jm 512 Error while starting the YARN Client. Please check log output! {code} The problem is that the logs don't contain any information. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4137) JobManager web frontend does not shut down on OOM exception on JM
[ https://issues.apache.org/jira/browse/FLINK-4137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15364082#comment-15364082 ] Robert Metzger commented on FLINK-4137: --- Thanks a lot for the good analysis of the issue [~till.rohrmann]. I changed the configuration parameter to "on" and the JobManager was properly shutting down: {code} 2016-07-06 10:04:02,727 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 14 (in 1542 ms) 2016-07-06 10:04:06,093 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 15 @ 1467799446093 2016-07-06 10:04:07,832 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 15 (in 1646 ms) 2016-07-06 10:04:11,092 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 16 @ 1467799451092 2016-07-06 10:04:13,140 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 16 (in 1940 ms) 2016-07-06 10:04:16,092 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 17 @ 1467799456092 2016-07-06 10:04:19,859 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 17 (in 3520 ms) 2016-07-06 10:04:21,092 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 18 @ 1467799461092 2016-07-06 10:04:26,495 ERROR akka.actor.ActorSystemImpl - Uncaught error from thread [flink-akka.remote.default-remote-dispatcher-5] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled java.lang.OutOfMemoryError: Java heap space at java.lang.reflect.Array.newArray(Native Method) at java.lang.reflect.Array.newInstance(Array.java:70) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1670) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136) at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104) at scala.util.Try$.apply(Try.scala:161) at akka.serialization.Serialization.deserialize(Serialization.scala:98) at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23) at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:58) at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:58) at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:76) at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:937) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 2016-07-06 10:04:26,496 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping checkpoint coordinator for job e039626ba2dad012ef06e13ab84741fc 2016-07-06 10:04:26,495 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping checkpoint coordinator for job e039626ba2dad012ef06e13ab84741fc 2016-07-06 10:04:26,498 INFO org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Removing web dashboard root cache directory /tmp/flink-web-c735e6c2-a9eb-4a56-91e3-215d3e8821b1 2016-07-06 10:04:26,608 INFO org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Removing web dashboard jar upload directory /tmp/flink-web-upload-1cb8d9b5-9b92-4498-9ab4-a993e72827c7 2016-07-06 10:04:26,784 INFO org.apache.flink.runtime.blob.BlobServ
[jira] [Resolved] (FLINK-4151) Address Travis CI build time: We are exceeding the 2 hours limit
[ https://issues.apache.org/jira/browse/FLINK-4151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-4151. --- Resolution: Fixed Resolved for now by splitting the builds: http://git-wip-us.apache.org/repos/asf/flink/commit/f1499388 > Address Travis CI build time: We are exceeding the 2 hours limit > > > Key: FLINK-4151 > URL: https://issues.apache.org/jira/browse/FLINK-4151 > Project: Flink > Issue Type: Task > Components: Build System >Reporter: Robert Metzger >Assignee: Robert Metzger > Attachments: Pasted image at 2016_07_04 16_31.png > > > We've recently started hitting the two hours limit for Travis CI. > I'll look into some approaches to get our build stable again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4137) JobManager web frontend does not shut down on OOM exception on JM
[ https://issues.apache.org/jira/browse/FLINK-4137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-4137. --- Resolution: Fixed Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/cdd5e4ca > JobManager web frontend does not shut down on OOM exception on JM > - > > Key: FLINK-4137 > URL: https://issues.apache.org/jira/browse/FLINK-4137 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, JobManager, Webfrontend >Reporter: Robert Metzger >Assignee: Till Rohrmann >Priority: Critical > > After the following Exception on the JobManager. > {code} > 2016-06-30 14:45:06,642 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed > checkpoint 379 (in 7017 ms) > 2016-06-30 14:45:06,642 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 380 @ 1467297906642 > 2016-06-30 14:45:17,902 ERROR akka.actor.ActorSystemImpl > - Uncaught fatal error from thread > [flink-akka.remote.default-remote-dispatcher-6] shutting down ActorSystem > [flink] > java.lang.OutOfMemoryError: Java heap space > at com.google.protobuf.ByteString.copyFrom(ByteString.java:192) > at > com.google.protobuf.CodedInputStream.readBytes(CodedInputStream.java:324) > at > akka.remote.WireFormats$SerializedMessage.(WireFormats.java:3030) > at > akka.remote.WireFormats$SerializedMessage.(WireFormats.java:2980) > at > akka.remote.WireFormats$SerializedMessage$1.parsePartialFrom(WireFormats.java:3073) > at > akka.remote.WireFormats$SerializedMessage$1.parsePartialFrom(WireFormats.java:3068) > at > com.google.protobuf.CodedInputStream.readMessage(CodedInputStream.java:309) > at akka.remote.WireFormats$RemoteEnvelope.(WireFormats.java:993) > at akka.remote.WireFormats$RemoteEnvelope.(WireFormats.java:927) > at > akka.remote.WireFormats$RemoteEnvelope$1.parsePartialFrom(WireFormats.java:1049) > at > akka.remote.WireFormats$RemoteEnvelope$1.parsePartialFrom(WireFormats.java:1044) > at > com.google.protobuf.CodedInputStream.readMessage(CodedInputStream.java:309) > at > akka.remote.WireFormats$AckAndEnvelopeContainer.(WireFormats.java:241) > at > akka.remote.WireFormats$AckAndEnvelopeContainer.(WireFormats.java:175) > at > akka.remote.WireFormats$AckAndEnvelopeContainer$1.parsePartialFrom(WireFormats.java:279) > at > akka.remote.WireFormats$AckAndEnvelopeContainer$1.parsePartialFrom(WireFormats.java:274) > at > com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:141) > at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:176) > at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:188) > at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:193) > at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49) > at > akka.remote.WireFormats$AckAndEnvelopeContainer.parseFrom(WireFormats.java:409) > at > akka.remote.transport.AkkaPduProtobufCodec$.decodeMessage(AkkaPduCodec.scala:181) > at > akka.remote.EndpointReader.akka$remote$EndpointReader$$tryDecodeMessageAndAck(Endpoint.scala:995) > at > akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:928) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > 2016-06-30 14:45:18,502 INFO org.apache.flink.yarn.YarnJobManager > - Stopping JobManager > akka.tcp://flink@172.31.23.121:45569/user/jobmanager. > 2016-06-30 14:45:18,533 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: > Custom File Source (1/1) (5f2a1062c796ec6098a0a88227b9eab4) switched from > RUNNING to CANCELING > {code} > The JobManager JVM keeps running (keeping the YARN session alive) because the > web monitor is not stopped on such errors. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-3034. --- Resolution: Fixed Fix Version/s: 1.1.0 Merged in: http://git-wip-us.apache.org/repos/asf/flink/commit/3ab9e36c Thanks a lot for your contribution! > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > Fix For: 1.1.0 > > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4156) Job with -m yarn-cluster registers TaskManagers to another running Yarn session
[ https://issues.apache.org/jira/browse/FLINK-4156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15365978#comment-15365978 ] Robert Metzger commented on FLINK-4156: --- [~aljoscha]: This only happens if the same zookeeper namespace has been configured for both jobs. This can easily happen if you put the zookeeper namespace into the conf/flink-conf.yaml file, because then both the YARN session and the per-job yarn cluster use the same config. But if users put different namespaces into the config between the jobs (or they pass them using ./bin/yarn-session.sh -Dzk.foo=bar) then it will work properly. As Stefan said, an easy workaround would be setting a UUID by default in the YARN code. > Job with -m yarn-cluster registers TaskManagers to another running Yarn > session > --- > > Key: FLINK-4156 > URL: https://issues.apache.org/jira/browse/FLINK-4156 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, YARN Client >Reporter: Stefan Richter > > When a job is started using cluster mode (-m yarn-cluster) and a Yarn session > is running on the same cluster, the job accidentally registers it's worker > tasks with the ongoing Yarn session. This happens because the same Zookeeper > namespace is used. > We should consider isolating Flink applications from another by using UUIDS, > e.g. based on their application ids, in their Zookeeper paths. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4156) Job with -m yarn-cluster registers TaskManagers to another running Yarn session
[ https://issues.apache.org/jira/browse/FLINK-4156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4156: -- Component/s: YARN Client > Job with -m yarn-cluster registers TaskManagers to another running Yarn > session > --- > > Key: FLINK-4156 > URL: https://issues.apache.org/jira/browse/FLINK-4156 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, YARN Client >Reporter: Stefan Richter > > When a job is started using cluster mode (-m yarn-cluster) and a Yarn session > is running on the same cluster, the job accidentally registers it's worker > tasks with the ongoing Yarn session. This happens because the same Zookeeper > namespace is used. > We should consider isolating Flink applications from another by using UUIDS, > e.g. based on their application ids, in their Zookeeper paths. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4168) ForkableFlinkMiniCluster not available in Kinesis connector tests
[ https://issues.apache.org/jira/browse/FLINK-4168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15366040#comment-15366040 ] Robert Metzger commented on FLINK-4168: --- Ufuk just published a hotfix for this issue to the master: http://git-wip-us.apache.org/repos/asf/flink/commit/9fa54cc3 > ForkableFlinkMiniCluster not available in Kinesis connector tests > - > > Key: FLINK-4168 > URL: https://issues.apache.org/jira/browse/FLINK-4168 > Project: Flink > Issue Type: Bug > Components: Build System, Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > We seem to have left out updating the `flink-test-utils` dependency in the > Kinesis connector for the recent test utils dependency restructure in > https://issues.apache.org/jira/browse/FLINK-3995, so it's not building > properly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4168) ForkableFlinkMiniCluster not available in Kinesis connector tests
[ https://issues.apache.org/jira/browse/FLINK-4168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4168: -- Component/s: Build System > ForkableFlinkMiniCluster not available in Kinesis connector tests > - > > Key: FLINK-4168 > URL: https://issues.apache.org/jira/browse/FLINK-4168 > Project: Flink > Issue Type: Bug > Components: Build System, Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > We seem to have left out updating the `flink-test-utils` dependency in the > Kinesis connector for the recent test utils dependency restructure in > https://issues.apache.org/jira/browse/FLINK-3995, so it's not building > properly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4170) Remove `CONFIG_` prefix from KinesisConfigConstants variables
[ https://issues.apache.org/jira/browse/FLINK-4170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15366086#comment-15366086 ] Robert Metzger commented on FLINK-4170: --- +1 > Remove `CONFIG_` prefix from KinesisConfigConstants variables > - > > Key: FLINK-4170 > URL: https://issues.apache.org/jira/browse/FLINK-4170 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Ufuk Celebi > Fix For: 1.1.0 > > > I find the static variable names verbose. I think it's clear from context > that they refer to the Kinesis configuration since they are all gathered in > that class. > Therefore would like to remove the {{CONFIG_}} prefix before the release, so > that we have > {code} > conf.put(KinesisConfigConstants.AWS_REGION, "") > {code} > instead of > {code} > conf.put(KinesisConfigConstants.CONFIG_AWS_REGION, "") > {code} > For longer variables it becomes even longer otherwise. > --- > Some basic variable names that might be accessed frequently are also very > long: > {code} > CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY > CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID > {code} > It might suffice to just have: > {code} > AWS_SECRET_KEY > AWS_ACCESS_KEY > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4169) CEP Does Not Work with RocksDB StateBackend
[ https://issues.apache.org/jira/browse/FLINK-4169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15366088#comment-15366088 ] Robert Metzger commented on FLINK-4169: --- Adding rocksdb as test-scoped dependency to the CEP library should not be an issue. > CEP Does Not Work with RocksDB StateBackend > --- > > Key: FLINK-4169 > URL: https://issues.apache.org/jira/browse/FLINK-4169 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.0.0, 1.1.0, 1.0.1, 1.0.2, 1.0.3 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > > A job will never match any patterns because {{ValueState.update()}} is not > called in the keyed CEP operators for updating the {{NFA}} state and the > priority queue state. > The reason why it works for other state backends is that they are very lax in > their handling of state: if the object returned from {{ValueState.value())}} > is mutable changes to this will be reflected in checkpoints even if > {{ValueState.update()}} is not called. RocksDB, on the other hand, does > always deserialize/serialize state values when accessing/updating them, so > changes to the returned object will not be reflected in the state unless > {{update()}} is called. > We should fix this and also add a test for it. This might be tricky because > we have to pull together RocksDB and CEP. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4156) Job with -m yarn-cluster registers TaskManagers to another running Yarn session
[ https://issues.apache.org/jira/browse/FLINK-4156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15366126#comment-15366126 ] Robert Metzger commented on FLINK-4156: --- Isn't this issue the same as FLINK-4166? > Job with -m yarn-cluster registers TaskManagers to another running Yarn > session > --- > > Key: FLINK-4156 > URL: https://issues.apache.org/jira/browse/FLINK-4156 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, YARN Client >Reporter: Stefan Richter > > When a job is started using cluster mode (-m yarn-cluster) and a Yarn session > is running on the same cluster, the job accidentally registers it's worker > tasks with the ongoing Yarn session. This happens because the same Zookeeper > namespace is used. > We should consider isolating Flink applications from another by using UUIDS, > e.g. based on their application ids, in their Zookeeper paths. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15367444#comment-15367444 ] Robert Metzger commented on FLINK-4022: --- Hi [~tzulitai], Yes, go ahead and assign the JIRA to yourself. I'm not aware of anybody else working on this. I would prefer to merge this feature after the 1.1 release to give us more time to test it properly. I also think that the implementation is going to be somewhat similar to the Kinesis implementation. I don't expect any issues from Kafka with the constant partition fetching, but we have to do some experiments to find that out. Please decide yourself whether it makes sense to fix the related issues before this one or together with this one. > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigne
[jira] [Commented] (FLINK-3924) Remove protobuf shading from Kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-3924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15367445#comment-15367445 ] Robert Metzger commented on FLINK-3924: --- No. I could not find any good solution for the issue. I've talked with Amazon and they are looking into different approaches. I hope that we can fix the issue for 1.2. For now, we'll need to rely on users building the connector themselves. > Remove protobuf shading from Kinesis connector > -- > > Key: FLINK-3924 > URL: https://issues.apache.org/jira/browse/FLINK-3924 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: 1.2.0 > > > The Kinesis connector is currently creating a fat jar with a custom protobuf > version (2.6.1), relocated into a different package. > We need to build the fat jar to change the protobuf calls from the original > protobuf to the relocated one. > Because Kinesis is licensed under the Amazon Software License (which is not > entirely to the ASL2.0), I don't want to deploy kinesis connector binaries to > maven central with the releases. These binaries would contain code from > Amazon. It would be more than just linking to an (optional) dependencies. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3924) Remove protobuf shading from Kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-3924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-3924: -- Fix Version/s: (was: 1.1.0) 1.2.0 > Remove protobuf shading from Kinesis connector > -- > > Key: FLINK-3924 > URL: https://issues.apache.org/jira/browse/FLINK-3924 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: 1.2.0 > > > The Kinesis connector is currently creating a fat jar with a custom protobuf > version (2.6.1), relocated into a different package. > We need to build the fat jar to change the protobuf calls from the original > protobuf to the relocated one. > Because Kinesis is licensed under the Amazon Software License (which is not > entirely to the ASL2.0), I don't want to deploy kinesis connector binaries to > maven central with the releases. These binaries would contain code from > Amazon. It would be more than just linking to an (optional) dependencies. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4177) CassandraConnectorTest.testCassandraCommitter causing unstable builds
Robert Metzger created FLINK-4177: - Summary: CassandraConnectorTest.testCassandraCommitter causing unstable builds Key: FLINK-4177 URL: https://issues.apache.org/jira/browse/FLINK-4177 Project: Flink Issue Type: Bug Components: Cassandra Connector, Streaming Connectors Affects Versions: 1.1.0 Reporter: Robert Metzger This build: https://api.travis-ci.org/jobs/143272982/log.txt?deansi=true failed with {code} 07/08/2016 09:59:12 Job execution switched to status FINISHED. Tests run: 7, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 146.646 sec <<< FAILURE! - in org.apache.flink.streaming.connectors.cassandra.CassandraConnectorTest testCassandraCommitter(org.apache.flink.streaming.connectors.cassandra.CassandraConnectorTest) Time elapsed: 9.057 sec <<< ERROR! com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout during write query at consistency LOCAL_SERIAL (1 replica were required but only 0 acknowledged the write) at com.datastax.driver.core.exceptions.WriteTimeoutException.copy(WriteTimeoutException.java:73) at com.datastax.driver.core.exceptions.WriteTimeoutException.copy(WriteTimeoutException.java:26) at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37) at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245) at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:63) at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:39) at org.apache.flink.streaming.connectors.cassandra.CassandraCommitter.open(CassandraCommitter.java:103) at org.apache.flink.streaming.connectors.cassandra.CassandraConnectorTest.testCassandraCommitter(CassandraConnectorTest.java:284) Caused by: com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout during write query at consistency LOCAL_SERIAL (1 replica were required but only 0 acknowledged the write) at com.datastax.driver.core.exceptions.WriteTimeoutException.copy(WriteTimeoutException.java:100) at com.datastax.driver.core.Responses$Error.asException(Responses.java:122) at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:477) at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1005) at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:928) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:276) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:263) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846) at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:618) at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:329) at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:250) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112) at java.lang.Thread.run(Thread.java:745) Caused by: com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout during write query at consistency LOCAL_SERIAL (1 replica were required but only 0 acknowledged the write) at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:59) at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:37) at com.datastax.driv
[jira] [Created] (FLINK-4178) CoStreamCheckpointingITCase deadlocks
Robert Metzger created FLINK-4178: - Summary: CoStreamCheckpointingITCase deadlocks Key: FLINK-4178 URL: https://issues.apache.org/jira/browse/FLINK-4178 Project: Flink Issue Type: Bug Components: DataStream API, Windowing Operators Affects Versions: 1.1.0 Reporter: Robert Metzger I didn't analyze what exactly caused the program to stop making any progress, but it looks like some locking issue somewhere. Here is the log: https://s3.amazonaws.com/archive.travis-ci.org/jobs/14339/log.txt It contains many traces like this {code} OutputFlusher" daemon prio=10 tid=0x7fd240853000 nid=0xb026 waiting for monitor entry [0x7fd21a5ee000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(RecordWriter.java:173) - waiting to lock <0xfdd76fb8> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer) at org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:176) "Map -> Map (7/12)" daemon prio=10 tid=0x7fd24d0d5800 nid=0xb023 in Object.wait() [0x7fd21a6ef000] java.lang.Thread.State: TIMED_WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on <0xfdd76b78> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163) - locked <0xfdd76b78> (a java.util.ArrayDeque) {code} and some of these: {code} "Async calls on Source: Custom Source -> Filter (7/12)" daemon prio=10 tid=0x7fd22844c000 nid=0xb0cc waiting for monitor entry [0x7fd1faa7b000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:557) - waiting to lock <0xfde15960> (a java.lang.Object) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:541) at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:945) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4176) Travis build fails at flink-connector-kinesis for JDK: openjdk7
[ https://issues.apache.org/jira/browse/FLINK-4176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-4176. --- Resolution: Fixed Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/230847d5 > Travis build fails at flink-connector-kinesis for JDK: openjdk7 > --- > > Key: FLINK-4176 > URL: https://issues.apache.org/jira/browse/FLINK-4176 > Project: Flink > Issue Type: Bug > Components: Build System, Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.1.0 > > > Kinesis consumer code is using the static `Long.hashCode()` method, which is > supported only since Java 1.8. Changing to use > `Long.valueOf(long).hashCode()` should fix this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4176) Travis build fails at flink-connector-kinesis for JDK: openjdk7
[ https://issues.apache.org/jira/browse/FLINK-4176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4176: -- Component/s: Streaming Connectors Kinesis Connector Build System > Travis build fails at flink-connector-kinesis for JDK: openjdk7 > --- > > Key: FLINK-4176 > URL: https://issues.apache.org/jira/browse/FLINK-4176 > Project: Flink > Issue Type: Bug > Components: Build System, Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.1.0 > > > Kinesis consumer code is using the static `Long.hashCode()` method, which is > supported only since Java 1.8. Changing to use > `Long.valueOf(long).hashCode()` should fix this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3516) JobManagerHACheckpointRecoveryITCase testCheckpointedStreamingSumProgram fails
[ https://issues.apache.org/jira/browse/FLINK-3516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15369025#comment-15369025 ] Robert Metzger commented on FLINK-3516: --- Once again https://s3.amazonaws.com/archive.travis-ci.org/jobs/143311423/log.txt > JobManagerHACheckpointRecoveryITCase testCheckpointedStreamingSumProgram fails > -- > > Key: FLINK-3516 > URL: https://issues.apache.org/jira/browse/FLINK-3516 > Project: Flink > Issue Type: Test > Components: Distributed Coordination, JobManager >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Labels: test-stability > > https://s3.amazonaws.com/archive.travis-ci.org/jobs/111782050/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3516) JobManagerHACheckpointRecoveryITCase testCheckpointedStreamingSumProgram fails
[ https://issues.apache.org/jira/browse/FLINK-3516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-3516: -- Component/s: JobManager Distributed Coordination > JobManagerHACheckpointRecoveryITCase testCheckpointedStreamingSumProgram fails > -- > > Key: FLINK-3516 > URL: https://issues.apache.org/jira/browse/FLINK-3516 > Project: Flink > Issue Type: Test > Components: Distributed Coordination, JobManager >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Labels: test-stability > > https://s3.amazonaws.com/archive.travis-ci.org/jobs/111782050/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3516) JobManagerHACheckpointRecoveryITCase testCheckpointedStreamingSumProgram fails
[ https://issues.apache.org/jira/browse/FLINK-3516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-3516: -- Labels: test-stability (was: ) > JobManagerHACheckpointRecoveryITCase testCheckpointedStreamingSumProgram fails > -- > > Key: FLINK-3516 > URL: https://issues.apache.org/jira/browse/FLINK-3516 > Project: Flink > Issue Type: Test > Components: Distributed Coordination, JobManager >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Labels: test-stability > > https://s3.amazonaws.com/archive.travis-ci.org/jobs/111782050/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface
[ https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-4019. --- Resolution: Fixed Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/68277a43 > Expose approximateArrivalTimestamp through the KinesisDeserializationSchema > interface > - > > Key: FLINK-4019 > URL: https://issues.apache.org/jira/browse/FLINK-4019 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > Amazon's Record class also gives information about the timestamp of when > Kinesis successfully receives the record: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp(). > This should be useful info for users and should be exposed through the > deserialization schema. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4186) Expose Kafka metrics through Flink metrics
Robert Metzger created FLINK-4186: - Summary: Expose Kafka metrics through Flink metrics Key: FLINK-4186 URL: https://issues.apache.org/jira/browse/FLINK-4186 Project: Flink Issue Type: Improvement Components: Kafka Connector Affects Versions: 1.1.0 Reporter: Robert Metzger Assignee: Robert Metzger Currently, we expose the Kafka metrics through Flink's accumulators. We can now use the metrics system in Flink to report Kafka metrics. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4170) Remove `CONFIG_` prefix from KinesisConfigConstants variables
[ https://issues.apache.org/jira/browse/FLINK-4170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15370305#comment-15370305 ] Robert Metzger commented on FLINK-4170: --- +1 > Remove `CONFIG_` prefix from KinesisConfigConstants variables > - > > Key: FLINK-4170 > URL: https://issues.apache.org/jira/browse/FLINK-4170 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Ufuk Celebi >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > I find the static variable names verbose. I think it's clear from context > that they refer to the Kinesis configuration since they are all gathered in > that class. > Therefore would like to remove the {{CONFIG_}} prefix before the release, so > that we have > {code} > conf.put(KinesisConfigConstants.AWS_REGION, "") > {code} > instead of > {code} > conf.put(KinesisConfigConstants.CONFIG_AWS_REGION, "") > {code} > For longer variables it becomes even longer otherwise. > --- > Some basic variable names that might be accessed frequently are also very > long: > {code} > CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY > CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID > {code} > It might suffice to just have: > {code} > AWS_SECRET_KEY > AWS_ACCESS_KEY > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4185) Reflecting rename from Tachyon to Alluxio
[ https://issues.apache.org/jira/browse/FLINK-4185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-4185. --- Resolution: Fixed Fix Version/s: 1.1.0 Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/5df0b275 > Reflecting rename from Tachyon to Alluxio > - > > Key: FLINK-4185 > URL: https://issues.apache.org/jira/browse/FLINK-4185 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.0.3 >Reporter: Jiri Simsa >Priority: Trivial > Fix For: 1.1.0 > > > The Tachyon project has been renamed to Alluxio earlier this year. The goal > of this issue is to reflect this in the Flink documentation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4178) CoStreamCheckpointingITCase deadlocks
[ https://issues.apache.org/jira/browse/FLINK-4178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15370609#comment-15370609 ] Robert Metzger commented on FLINK-4178: --- It happened again: https://s3.amazonaws.com/archive.travis-ci.org/jobs/14339/log.txt > CoStreamCheckpointingITCase deadlocks > - > > Key: FLINK-4178 > URL: https://issues.apache.org/jira/browse/FLINK-4178 > Project: Flink > Issue Type: Bug > Components: DataStream API, Windowing Operators >Affects Versions: 1.1.0 >Reporter: Robert Metzger > Labels: test-stability > > I didn't analyze what exactly caused the program to stop making any progress, > but it looks like some locking issue somewhere. > Here is the log: > https://s3.amazonaws.com/archive.travis-ci.org/jobs/14339/log.txt > It contains many traces like this > {code} > OutputFlusher" daemon prio=10 tid=0x7fd240853000 nid=0xb026 waiting for > monitor entry [0x7fd21a5ee000] >java.lang.Thread.State: BLOCKED (on object monitor) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(RecordWriter.java:173) > - waiting to lock <0xfdd76fb8> (a > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:176) > "Map -> Map (7/12)" daemon prio=10 tid=0x7fd24d0d5800 nid=0xb023 in > Object.wait() [0x7fd21a6ef000] >java.lang.Thread.State: TIMED_WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > - waiting on <0xfdd76b78> (a java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163) > - locked <0xfdd76b78> (a java.util.ArrayDeque) > {code} > and some of these: > {code} > "Async calls on Source: Custom Source -> Filter (7/12)" daemon prio=10 > tid=0x7fd22844c000 nid=0xb0cc waiting for monitor entry > [0x7fd1faa7b000] >java.lang.Thread.State: BLOCKED (on object monitor) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:557) > - waiting to lock <0xfde15960> (a java.lang.Object) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:541) > at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:945) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4157) FlinkKafkaMetrics cause TaskManager shutdown during cancellation
[ https://issues.apache.org/jira/browse/FLINK-4157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-4157. --- Resolution: Fixed Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/36aad48e > FlinkKafkaMetrics cause TaskManager shutdown during cancellation > > > Key: FLINK-4157 > URL: https://issues.apache.org/jira/browse/FLINK-4157 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Robert Metzger >Assignee: Robert Metzger >Priority: Critical > Fix For: 1.1.0 > > > The following issue was reported by a user: > {code} > 2016-07-05 01:32:25,113 INFO org.apache.flink.runtime.taskmanager.Task > - Freeing task resources for Sink: KafkaOutput (59/72) > 2016-07-05 01:32:25,113 INFO org.apache.flink.runtime.taskmanager.Task > - Sink: KafkaOutput (53/72) switched to CANCELED > 2016-07-05 01:32:25,113 INFO org.apache.flink.runtime.taskmanager.Task > - Freeing task resources for Sink: KafkaOutput (53/72) > 2016-07-05 01:32:25,144 ERROR akka.actor.OneForOneStrategy > - > java.util.ConcurrentModificationException > at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429) > at java.util.HashMap$ValueIterator.next(HashMap.java:1458) > at > org.apache.kafka.clients.InFlightRequests.inFlightRequestCount(InFlightRequests.java:106) > at > org.apache.kafka.clients.NetworkClient.inFlightRequestCount(NetworkClient.java:211) > at > org.apache.kafka.clients.producer.internals.Sender$SenderMetrics$1.measure(Sender.java:383) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:57) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) > at > org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator.writeObject(DefaultKafkaMetricAccumulator.java:152) > at sun.reflect.GeneratedMethodAccessor20859.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at java.util.HashMap.internalWriteEntries(HashMap.java:1777) > at java.util.HashMap.writeObject(HashMap.java:1354) > at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) > at > java.util.Collections$SynchronizedMap.writeObject(Collections.java:2691) > at sun.reflect.GeneratedMethodAccessor226.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300) > at org.apache.flink.util.SerializedValue.(SerializedValue.java:48) > at > org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58) > at > org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:78) > at > org.apache.flink.runtime.taskmanager.TaskManager.unregisterTaskAndNotifyFinalState(
[jira] [Created] (FLINK-4191) Expose shard information in KinesisDeserializationSchema
Robert Metzger created FLINK-4191: - Summary: Expose shard information in KinesisDeserializationSchema Key: FLINK-4191 URL: https://issues.apache.org/jira/browse/FLINK-4191 Project: Flink Issue Type: Sub-task Components: Kinesis Connector, Streaming Connectors Affects Versions: 1.1.0 Reporter: Robert Metzger Assignee: Robert Metzger Fix For: 1.1.0 Currently, we are not exposing the Shard ID and other shard-related information in the deserialization schema. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4194) KinesisDeserializationSchema.isEndOfStream() is never called
Robert Metzger created FLINK-4194: - Summary: KinesisDeserializationSchema.isEndOfStream() is never called Key: FLINK-4194 URL: https://issues.apache.org/jira/browse/FLINK-4194 Project: Flink Issue Type: Sub-task Components: Kinesis Connector Affects Versions: 1.1.0 Reporter: Robert Metzger The Kinesis connector does not respect the {{KinesisDeserializationSchema.isEndOfStream()}} method. The purpose of this method is to stop consuming from a source, based on input data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4195) Dedicated Configuration classes for Kinesis Consumer / Producer
[ https://issues.apache.org/jira/browse/FLINK-4195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15371496#comment-15371496 ] Robert Metzger commented on FLINK-4195: --- If we are going to rework the configuration, I wonder if it makes sense to use a typed configuration class, similar to Amazon's {{KinesisProducerConfiguration}} class. The reason why I chose to use Properties was to have a similar configuration pattern with the Kafka connector. Also, its easier to parse configuration from the command line into properties. But I'm open to discuss this. > Dedicated Configuration classes for Kinesis Consumer / Producer > --- > > Key: FLINK-4195 > URL: https://issues.apache.org/jira/browse/FLINK-4195 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > While fixing FLINK-4170, I feel that configuration and default value setting > & validation is quite messy and unconsolidated for the current state of the > code, and will likely become worse as more configs grow for the Kinesis > connector. > I propose to have a dedicated configuration class (instead of only Java > properties) along the lines of Flink's own {{Configuration}}, so that the > usage pattern is alike. There will be separate configuration classes for > {{FlinkKinesisConsumerConfig}} and {{FlinkKinesisProducerConfig}}. > [~uce] [~rmetzger] What do you think? This will break the interface, so if > we're to change this, I'd prefer to fix it along with FLINK-4170 for Flink > 1.1. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4191) Expose shard information in KinesisDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-4191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-4191. --- Resolution: Fixed Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/662b4586 > Expose shard information in KinesisDeserializationSchema > > > Key: FLINK-4191 > URL: https://issues.apache.org/jira/browse/FLINK-4191 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: 1.1.0 > > > Currently, we are not exposing the Shard ID and other shard-related > information in the deserialization schema. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4018) Configurable idle time between getRecords requests to Kinesis shards
[ https://issues.apache.org/jira/browse/FLINK-4018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-4018. --- Resolution: Fixed Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/f0387aca > Configurable idle time between getRecords requests to Kinesis shards > > > Key: FLINK-4018 > URL: https://issues.apache.org/jira/browse/FLINK-4018 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > Currently, the Kinesis consumer is calling getRecords() right after finishing > previous calls. This results in easily reaching Amazon's limitation of 5 GET > requests per shard per second. Although the code already has backoff & retry > mechanism, this will affect other applications consuming from the same > Kinesis stream. > Along with this new configuration and the already existing > `KinesisConfigConstants.CONFIG_SHARD_RECORDS_PER_GET`, users will have more > control on the desired throughput behaviour for the Kinesis consumer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3516) JobManagerHACheckpointRecoveryITCase testCheckpointedStreamingSumProgram fails
[ https://issues.apache.org/jira/browse/FLINK-3516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372521#comment-15372521 ] Robert Metzger commented on FLINK-3516: --- And again https://s3.amazonaws.com/archive.travis-ci.org/jobs/144055780/log.txt > JobManagerHACheckpointRecoveryITCase testCheckpointedStreamingSumProgram fails > -- > > Key: FLINK-3516 > URL: https://issues.apache.org/jira/browse/FLINK-3516 > Project: Flink > Issue Type: Test > Components: Distributed Coordination, JobManager >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Labels: test-stability > > https://s3.amazonaws.com/archive.travis-ci.org/jobs/111782050/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4015) FlinkKafkaProducer08 fails when partition leader changes
[ https://issues.apache.org/jira/browse/FLINK-4015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372606#comment-15372606 ] Robert Metzger commented on FLINK-4015: --- What's the throughput of the Kafka producer? Do you think the synchronous producer would be fast enough? I'm asking because the synchronous variant is probably easier to implement and also to operate. With the buffering Kafka producer, you need to maintain all the unconfirmed records in memory, so you may run into garbage collection issues. > FlinkKafkaProducer08 fails when partition leader changes > > > Key: FLINK-4015 > URL: https://issues.apache.org/jira/browse/FLINK-4015 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.2 >Reporter: Sebastian Klemke > > When leader for a partition changes, producer fails with the following > exception: > {code} > 06:34:50,813 INFO org.apache.flink.yarn.YarnJobManager >- Status of job b323f5de3d32504651e861d5ecb27e7c (JOB_NAME) changed to > FAILING. > java.lang.RuntimeException: Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at OPERATOR.flatMap2(OPERATOR.java:82) > at OPERATOR.flatMap2(OPERATOR.java:16) > at > org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:63) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:207) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 10 more > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 13 more > Caused by: java.lang.Exception: Failed to send data to Kafka: This server is > not the leader for that topic-partition. > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 16 more > Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: > This server is not the leader for that topic-partition. > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4199) Wrong client behavior when submitting job to non-existing cluster
[ https://issues.apache.org/jira/browse/FLINK-4199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4199: -- Component/s: Client > Wrong client behavior when submitting job to non-existing cluster > - > > Key: FLINK-4199 > URL: https://issues.apache.org/jira/browse/FLINK-4199 > Project: Flink > Issue Type: Bug > Components: Client >Reporter: Kostas Kloudas > > Trying to submit a job jar from the client to a non-existing cluster gives > the following messages. In particular the first and last lines: "Cluster > retrieved: Standalone cluster with JobManager at localhost/127.0.0.1:6123" > and "Job has been submitted with" are totally misleading. > Cluster retrieved: Standalone cluster with JobManager at > localhost/127.0.0.1:6123 > Using address localhost:6123 to connect to JobManager. > JobManager web interface address http://localhost:8081 > Starting execution of program > Submitting job with JobID: 9c7120e5cc55b2a9157a7e2bc5a12c9d. Waiting for job > completion. > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Communication with JobManager failed: Lost connection to > the JobManager. > Job has been submitted with JobID 9c7120e5cc55b2a9157a7e2bc5a12c9d -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4199) Wrong client behavior when submitting job to non-existing cluster
[ https://issues.apache.org/jira/browse/FLINK-4199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4199: -- Description: Trying to submit a job jar from the client to a non-existing cluster gives the following messages. In particular the first and last lines: "Cluster retrieved: Standalone cluster with JobManager at localhost/127.0.0.1:6123" and "Job has been submitted with" are totally misleading. {code} Cluster retrieved: Standalone cluster with JobManager at localhost/127.0.0.1:6123 Using address localhost:6123 to connect to JobManager. JobManager web interface address http://localhost:8081 Starting execution of program Submitting job with JobID: 9c7120e5cc55b2a9157a7e2bc5a12c9d. Waiting for job completion. org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Communication with JobManager failed: Lost connection to the JobManager. Job has been submitted with JobID 9c7120e5cc55b2a9157a7e2bc5a12c9d {code} was: Trying to submit a job jar from the client to a non-existing cluster gives the following messages. In particular the first and last lines: "Cluster retrieved: Standalone cluster with JobManager at localhost/127.0.0.1:6123" and "Job has been submitted with" are totally misleading. {code} Cluster retrieved: Standalone cluster with JobManager at localhost/127.0.0.1:6123 Using address localhost:6123 to connect to JobManager. JobManager web interface address http://localhost:8081 Starting execution of program Submitting job with JobID: 9c7120e5cc55b2a9157a7e2bc5a12c9d. Waiting for job completion. org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Communication with JobManager failed: Lost connection to the JobManager. Job has been submitted with JobID 9c7120e5cc55b2a9157a7e2bc5a12c9d > Wrong client behavior when submitting job to non-existing cluster > - > > Key: FLINK-4199 > URL: https://issues.apache.org/jira/browse/FLINK-4199 > Project: Flink > Issue Type: Bug > Components: Client >Reporter: Kostas Kloudas > > Trying to submit a job jar from the client to a non-existing cluster gives > the following messages. In particular the first and last lines: "Cluster > retrieved: Standalone cluster with JobManager at localhost/127.0.0.1:6123" > and "Job has been submitted with" are totally misleading. > {code} > Cluster retrieved: Standalone cluster with JobManager at > localhost/127.0.0.1:6123 > Using address localhost:6123 to connect to JobManager. > JobManager web interface address http://localhost:8081 > Starting execution of program > Submitting job with JobID: 9c7120e5cc55b2a9157a7e2bc5a12c9d. Waiting for job > completion. > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Communication with JobManager failed: Lost connection to > the JobManager. > Job has been submitted with JobID 9c7120e5cc55b2a9157a7e2bc5a12c9d > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4199) Wrong client behavior when submitting job to non-existing cluster
[ https://issues.apache.org/jira/browse/FLINK-4199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4199: -- Description: Trying to submit a job jar from the client to a non-existing cluster gives the following messages. In particular the first and last lines: "Cluster retrieved: Standalone cluster with JobManager at localhost/127.0.0.1:6123" and "Job has been submitted with" are totally misleading. {code} Cluster retrieved: Standalone cluster with JobManager at localhost/127.0.0.1:6123 Using address localhost:6123 to connect to JobManager. JobManager web interface address http://localhost:8081 Starting execution of program Submitting job with JobID: 9c7120e5cc55b2a9157a7e2bc5a12c9d. Waiting for job completion. org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Communication with JobManager failed: Lost connection to the JobManager. Job has been submitted with JobID 9c7120e5cc55b2a9157a7e2bc5a12c9d was: Trying to submit a job jar from the client to a non-existing cluster gives the following messages. In particular the first and last lines: "Cluster retrieved: Standalone cluster with JobManager at localhost/127.0.0.1:6123" and "Job has been submitted with" are totally misleading. Cluster retrieved: Standalone cluster with JobManager at localhost/127.0.0.1:6123 Using address localhost:6123 to connect to JobManager. JobManager web interface address http://localhost:8081 Starting execution of program Submitting job with JobID: 9c7120e5cc55b2a9157a7e2bc5a12c9d. Waiting for job completion. org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Communication with JobManager failed: Lost connection to the JobManager. Job has been submitted with JobID 9c7120e5cc55b2a9157a7e2bc5a12c9d > Wrong client behavior when submitting job to non-existing cluster > - > > Key: FLINK-4199 > URL: https://issues.apache.org/jira/browse/FLINK-4199 > Project: Flink > Issue Type: Bug > Components: Client >Reporter: Kostas Kloudas > > Trying to submit a job jar from the client to a non-existing cluster gives > the following messages. In particular the first and last lines: "Cluster > retrieved: Standalone cluster with JobManager at localhost/127.0.0.1:6123" > and "Job has been submitted with" are totally misleading. > {code} > Cluster retrieved: Standalone cluster with JobManager at > localhost/127.0.0.1:6123 > Using address localhost:6123 to connect to JobManager. > JobManager web interface address http://localhost:8081 > Starting execution of program > Submitting job with JobID: 9c7120e5cc55b2a9157a7e2bc5a12c9d. Waiting for job > completion. > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Communication with JobManager failed: Lost connection to > the JobManager. > Job has been submitted with JobID 9c7120e5cc55b2a9157a7e2bc5a12c9d -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3704) JobManagerHAProcessFailureBatchRecoveryITCase.testJobManagerProcessFailure unstable
[ https://issues.apache.org/jira/browse/FLINK-3704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15374601#comment-15374601 ] Robert Metzger commented on FLINK-3704: --- Once again: https://s3.amazonaws.com/archive.travis-ci.org/jobs/144211723/log.txt > JobManagerHAProcessFailureBatchRecoveryITCase.testJobManagerProcessFailure > unstable > --- > > Key: FLINK-3704 > URL: https://issues.apache.org/jira/browse/FLINK-3704 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Robert Metzger > Labels: test-stability > > https://s3.amazonaws.com/archive.travis-ci.org/jobs/120882840/log.txt > {code} > testJobManagerProcessFailure[1](org.apache.flink.test.recovery.JobManagerHAProcessFailureBatchRecoveryITCase) > Time elapsed: 9.302 sec <<< ERROR! > java.io.IOException: Actor at > akka.tcp://flink@127.0.0.1:55591/user/jobmanager not reachable. Please make > sure that the actor is running and its port is reachable. > at > org.apache.flink.runtime.akka.AkkaUtils$.getActorRef(AkkaUtils.scala:384) > at org.apache.flink.runtime.akka.AkkaUtils.getActorRef(AkkaUtils.scala) > at > org.apache.flink.test.recovery.JobManagerHAProcessFailureBatchRecoveryITCase.testJobManagerProcessFailure(JobManagerHAProcessFailureBatchRecoveryITCase.java:290) > Caused by: akka.actor.ActorNotFound: Actor not found for: > ActorSelection[Anchor(akka.tcp://flink@127.0.0.1:55591/), > Path(/user/jobmanager)] > at > akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) > at > akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > at > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) > at > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) > at > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) > at > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) > at > scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) > at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) > at > akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) > at > akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) > at > akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) > at > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) > at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267) > at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508) > at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541) > at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531) > at > akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87) > at akka.remote.EndpointWriter.postStop(Endpoint.scala:561) > at akka.actor.Actor$class.aroundPostStop(Actor.scala:475) > at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415) > at > akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) > at > akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) > at akka.actor.ActorCell.terminate(ActorCell.scala:369) > at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) > at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) > at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4127) Clean up configuration and check breaking API changes
[ https://issues.apache.org/jira/browse/FLINK-4127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-4127. --- Resolution: Fixed Merged in http://git-wip-us.apache.org/repos/asf/flink/commit/6b7bb761 > Clean up configuration and check breaking API changes > - > > Key: FLINK-4127 > URL: https://issues.apache.org/jira/browse/FLINK-4127 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: 1.1.0 > > Attachments: flink-core.html, flink-java.html, flink-scala.html, > flink-streaming-java.html, flink-streaming-scala.html > > > For the upcoming 1.1. release, I'll check if there are any breaking API > changes and if the documentation is up tp date with the configuration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4206) Metric names should allow special characters
[ https://issues.apache.org/jira/browse/FLINK-4206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4206: -- Summary: Metric names should allow special characters (was: Metric names should alle special characters) > Metric names should allow special characters > > > Key: FLINK-4206 > URL: https://issues.apache.org/jira/browse/FLINK-4206 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.1.0 > > > Currently, the name of the metric is restricted to alphanumeric characters. > This restriction was originally put in place to circumvent issues due to > systems not supporting certain characters. > However, this restriction does not make a lot of sense since for group names > we don't enforce such a restriction. > This also affects the integration of the Kafka metrics, so i suggest removing > the restriction. > From now on it will be the responsibility of the reporter to make sure that > the metric identifier is supported by the external system. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4197) Allow Kinesis Endpoint to be Overridden via Config
[ https://issues.apache.org/jira/browse/FLINK-4197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-4197. --- Resolution: Fixed Assignee: Scott Kidder Fix Version/s: (was: 1.0.4) 1.1.0 Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/bc3a96f5 Thank you for the contribution [~skidder]. > Allow Kinesis Endpoint to be Overridden via Config > -- > > Key: FLINK-4197 > URL: https://issues.apache.org/jira/browse/FLINK-4197 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.0.3 >Reporter: Scott Kidder >Assignee: Scott Kidder >Priority: Minor > Labels: easyfix > Fix For: 1.1.0 > > Original Estimate: 1h > Remaining Estimate: 1h > > I perform local testing of my application stack with Flink configured as a > consumer on a Kinesis stream provided by Kinesalite, an implementation of > Kinesis built on LevelDB. This requires me to override the AWS endpoint to > refer to my local Kinesalite server rather than reference the real AWS > endpoint. I'd like to add a configuration property to the Kinesis streaming > connector that allows the AWS endpoint to be specified explicitly. > This should be a fairly small change and provide a lot of flexibility to > people looking to integrate Flink with Kinesis in a non-production setup. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4206) Metric names should allow special characters
[ https://issues.apache.org/jira/browse/FLINK-4206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-4206. --- Resolution: Fixed Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/cc60ba42 > Metric names should allow special characters > > > Key: FLINK-4206 > URL: https://issues.apache.org/jira/browse/FLINK-4206 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.1.0 > > > Currently, the name of the metric is restricted to alphanumeric characters. > This restriction was originally put in place to circumvent issues due to > systems not supporting certain characters. > However, this restriction does not make a lot of sense since for group names > we don't enforce such a restriction. > This also affects the integration of the Kafka metrics, so i suggest removing > the restriction. > From now on it will be the responsibility of the reporter to make sure that > the metric identifier is supported by the external system. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4194) KinesisDeserializationSchema.isEndOfStream() is never called
[ https://issues.apache.org/jira/browse/FLINK-4194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15376674#comment-15376674 ] Robert Metzger commented on FLINK-4194: --- I'm not planning to work on this one in the near future :) I know that the {{isEndOfStream()}} method is implemented differently in the two Kafka consumers. I agree that the {{FlinkKafkaConsumer09}} behavior is more reasonable. The Kinesis consumer should behave in the same fashion (stopping the consumption on all shards). Since there is no communication between the parallel consumer instances, the behavior of the {{isEndOfStream()}} method is not really defined anyways, and the method needs to be implemented in a way that it stops all parallel instances / shards / partitions. > KinesisDeserializationSchema.isEndOfStream() is never called > > > Key: FLINK-4194 > URL: https://issues.apache.org/jira/browse/FLINK-4194 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector >Affects Versions: 1.1.0 >Reporter: Robert Metzger > > The Kinesis connector does not respect the > {{KinesisDeserializationSchema.isEndOfStream()}} method. > The purpose of this method is to stop consuming from a source, based on input > data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4170) Remove `CONFIG_` prefix from KinesisConfigConstants variables
[ https://issues.apache.org/jira/browse/FLINK-4170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-4170. --- Resolution: Fixed Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/f1d79f1d Thank you Gordon. > Remove `CONFIG_` prefix from KinesisConfigConstants variables > - > > Key: FLINK-4170 > URL: https://issues.apache.org/jira/browse/FLINK-4170 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Ufuk Celebi >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > I find the static variable names verbose. I think it's clear from context > that they refer to the Kinesis configuration since they are all gathered in > that class. > Therefore would like to remove the {{CONFIG_}} prefix before the release, so > that we have > {code} > conf.put(KinesisConfigConstants.AWS_REGION, "") > {code} > instead of > {code} > conf.put(KinesisConfigConstants.CONFIG_AWS_REGION, "") > {code} > For longer variables it becomes even longer otherwise. > --- > Some basic variable names that might be accessed frequently are also very > long: > {code} > CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY > CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID > {code} > It might suffice to just have: > {code} > AWS_SECRET_KEY > AWS_ACCESS_KEY > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4214) JobExceptionsHandler will return all exceptions
[ https://issues.apache.org/jira/browse/FLINK-4214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4214: -- Component/s: Webfrontend > JobExceptionsHandler will return all exceptions > --- > > Key: FLINK-4214 > URL: https://issues.apache.org/jira/browse/FLINK-4214 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Reporter: Sumit Chawla >Priority: Minor > > JobExceptionsHandler will return all exceptions and is not incrementing the > integer to track the exceptions being serialized -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4211) Dynamic Properties not working for jobs submitted to Yarn session
[ https://issues.apache.org/jira/browse/FLINK-4211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15376886#comment-15376886 ] Robert Metzger commented on FLINK-4211: --- I think this is the expected behavior. The dynamic properties are one-time system configuration changes for a Flink-on-YARN application. Since you can not change the system config of Flink when submitting a job (to the already bootstrapped) server, this doesn't work. Is it okay if I close this as "Invalid", or do you think we should use this to clarify the documentation. > Dynamic Properties not working for jobs submitted to Yarn session > - > > Key: FLINK-4211 > URL: https://issues.apache.org/jira/browse/FLINK-4211 > Project: Flink > Issue Type: Bug > Components: YARN Client >Reporter: Stefan Richter > > The command line argument for dynamic properties (-D) is not working when > submitting jobs to a flink session. > Example: > {code} > bin/flink run -p 4 myJob.jar -D recovery.zookeeper.path.root=/flink/xyz > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4218) Sporadic "java.lang.RuntimeException: Error triggering a checkpoint..." causes task restarting
[ https://issues.apache.org/jira/browse/FLINK-4218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15377211#comment-15377211 ] Robert Metzger commented on FLINK-4218: --- I think this an artifact of S3's consistency model. Enabling EMRFS on EMR will probably resolve this issue: https://www.infoq.com/news/2015/01/emrfs-s3-consistency > Sporadic "java.lang.RuntimeException: Error triggering a checkpoint..." > causes task restarting > -- > > Key: FLINK-4218 > URL: https://issues.apache.org/jira/browse/FLINK-4218 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.1.0 >Reporter: Sergii Koshel > > Sporadically see exception as below. And restart of task because of it. > {code:title=Exception|borderStyle=solid} > java.lang.RuntimeException: Error triggering a checkpoint as the result of > receiving checkpoint barrier > at > org.apache.flink.streaming.runtime.tasks.StreamTask$3.onEvent(StreamTask.java:785) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$3.onEvent(StreamTask.java:775) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:265) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.FileNotFoundException: No such file or directory: > s3:///flink/checkpoints/ece317c26960464ba5de75f3bbc38cb2/chk-8810/96eebbeb-de14-45c7-8ebb-e7cde978d6d3 > at > org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:996) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:77) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:351) > at > org.apache.flink.runtime.state.filesystem.AbstractFileStateHandle.getFileSize(AbstractFileStateHandle.java:93) > at > org.apache.flink.runtime.state.filesystem.FileStreamStateHandle.getStateSize(FileStreamStateHandle.java:58) > at > org.apache.flink.runtime.state.AbstractStateBackend$DataInputViewHandle.getStateSize(AbstractStateBackend.java:482) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskStateList.getStateSize(StreamTaskStateList.java:77) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:604) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$3.onEvent(StreamTask.java:779) > ... 8 more > {code} > File actually exists on S3. > I suppose it is related to some race conditions with S3 but would be good to > retry a few times before stop task execution. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4142) Recovery problem in HA on Hadoop Yarn 2.4.1
[ https://issues.apache.org/jira/browse/FLINK-4142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger reassigned FLINK-4142: - Assignee: Robert Metzger > Recovery problem in HA on Hadoop Yarn 2.4.1 > --- > > Key: FLINK-4142 > URL: https://issues.apache.org/jira/browse/FLINK-4142 > Project: Flink > Issue Type: Bug > Components: YARN Client >Affects Versions: 1.0.3 >Reporter: Stefan Richter >Assignee: Robert Metzger > > On Hadoop Yarn 2.4.1, recovery in HA fails in the following scenario: > 1) Kill application master, let it recover normally. > 2) After that, kill a task manager. > Now, Yarn tries to restart the killed task manager in an endless loop. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4142) Recovery problem in HA on Hadoop Yarn 2.4.1
[ https://issues.apache.org/jira/browse/FLINK-4142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15379079#comment-15379079 ] Robert Metzger commented on FLINK-4142: --- Thank you for posting a log as well. It seems to be a YARN specific issue: {code} 2016-07-01 15:45:03,452 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Launching TaskManager in container ContainerInLaunch @ 1467387903451: Container: [ContainerId: container_1467387670862_0001_02_02, NodeId: hadoop-srichter-worker-3-vm.c.astral-sorter-757.internal:40436, NodeHttpAddress: hadoop-srichter-worker-3-vm.c.astral-sorter-757.internal:8042, Resource: , Priority: 0, Token: Token { kind: ContainerToken, service: 10.240.0.18:40436 }, ] on host hadoop-srichter-worker-3-vm.c.astral-sorter-757.internal 2016-07-01 15:45:03,455 INFO org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - Opening proxy : hadoop-srichter-worker-3-vm.c.astral-sorter-757.internal:40436 2016-07-01 15:45:03,508 ERROR org.apache.flink.yarn.YarnFlinkResourceManager - Could not start TaskManager in container ContainerInLaunch @ 1467387903451: Container: [ContainerId: container_1467387670862_0001_02_02, NodeId: hadoop-srichter-worker-3-vm.c.astral-sorter-757.internal:40436, NodeHttpAddress: hadoop-srichter-worker-3-vm.c.astral-sorter-757.internal:8042, Resource: , Priority: 0, Token: Token { kind: ContainerToken, service: 10.240.0.18:40436 }, ] org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to start container. NMToken for application attempt : appattempt_1467387670862_0001_01 was used for starting container with container token issued for application attempt : appattempt_1467387670862_0001_02 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:152) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106) at org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:206) at org.apache.flink.yarn.YarnFlinkResourceManager.containersAllocated(YarnFlinkResourceManager.java:403) at org.apache.flink.yarn.YarnFlinkResourceManager.handleMessage(YarnFlinkResourceManager.java:164) at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:90) at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:70) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2016-07-01 15:45:03,508 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Requesting new TaskManager container with 4096 megabytes memory. Pending requests: 1 2016-07-01 15:45:03,959 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Container ResourceID{resourceId='container_1467387670862_0001_02_02'} completed successfully with diagnostics: Container released by application {code} The problem was a major bug in Hadoop 2.4.0. It has been fixed in Hadoop 2.5.0. https://issues.apache.org/jira/browse/YARN-2065 I'll add a warning to the YARN documentation page that there are issues with HA on YARN < 2.5.0. > Recovery problem in HA on Hadoop Yarn 2.4.1 > --- > > Key: FLINK-4142 > URL: https://issues.apache.org/jira/browse/FLINK-4142 > Project: Flink > Issue Type: Bug > Components: YARN Client >Affects Versions: 1.0.3 >Reporter: Stefan Richter > > On Hadoop Yarn 2.4.1, recovery in HA fails in the following scenari
[jira] [Resolved] (FLINK-4186) Expose Kafka metrics through Flink metrics
[ https://issues.apache.org/jira/browse/FLINK-4186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-4186. --- Resolution: Fixed Fix Version/s: 1.1.0 Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/41f58182 > Expose Kafka metrics through Flink metrics > -- > > Key: FLINK-4186 > URL: https://issues.apache.org/jira/browse/FLINK-4186 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.1.0 >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: 1.1.0 > > > Currently, we expose the Kafka metrics through Flink's accumulators. > We can now use the metrics system in Flink to report Kafka metrics. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3204) TaskManagers are not shutting down properly on YARN
[ https://issues.apache.org/jira/browse/FLINK-3204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-3204: -- Affects Version/s: 1.1.0 > TaskManagers are not shutting down properly on YARN > --- > > Key: FLINK-3204 > URL: https://issues.apache.org/jira/browse/FLINK-3204 > Project: Flink > Issue Type: Bug > Components: YARN Client >Affects Versions: 1.0.0, 1.1.0 >Reporter: Robert Metzger > Labels: test-stability > > While running some experiments on a YARN cluster, I saw the following error > {code} > 10:15:24,741 INFO org.apache.flink.yarn.YarnJobManager >- Stopping YARN JobManager with status SUCCEEDED and diagnostic Flink YARN > Client requested shutdown. > 10:15:24,748 INFO org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl >- Waiting for application to be successfully unregistered. > 10:15:24,852 INFO > org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl - > Interrupted while waiting for queue > java.lang.InterruptedException > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2017) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2052) > at > java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) > at > org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:275) > 10:15:24,875 ERROR org.apache.hadoop.yarn.client.api.impl.NMClientImpl >- Failed to stop Container container_1452019681933_0002_01_10when > stopping NMClientImpl > 10:15:24,899 ERROR org.apache.hadoop.yarn.client.api.impl.NMClientImpl >- Failed to stop Container container_1452019681933_0002_01_07when > stopping NMClientImpl > 10:15:24,954 ERROR org.apache.hadoop.yarn.client.api.impl.NMClientImpl >- Failed to stop Container container_1452019681933_0002_01_06when > stopping NMClientImpl > 10:15:24,982 ERROR org.apache.hadoop.yarn.client.api.impl.NMClientImpl >- Failed to stop Container container_1452019681933_0002_01_09when > stopping NMClientImpl > 10:15:25,013 ERROR org.apache.hadoop.yarn.client.api.impl.NMClientImpl >- Failed to stop Container container_1452019681933_0002_01_11when > stopping NMClientImpl > 10:15:25,037 ERROR org.apache.hadoop.yarn.client.api.impl.NMClientImpl >- Failed to stop Container container_1452019681933_0002_01_08when > stopping NMClientImpl > 10:15:25,041 ERROR org.apache.hadoop.yarn.client.api.impl.NMClientImpl >- Failed to stop Container container_1452019681933_0002_01_12when > stopping NMClientImpl > 10:15:25,072 ERROR org.apache.hadoop.yarn.client.api.impl.NMClientImpl >- Failed to stop Container container_1452019681933_0002_01_05when > stopping NMClientImpl > 10:15:25,075 ERROR org.apache.hadoop.yarn.client.api.impl.NMClientImpl >- Failed to stop Container container_1452019681933_0002_01_03when > stopping NMClientImpl > 10:15:25,077 ERROR org.apache.hadoop.yarn.client.api.impl.NMClientImpl >- Failed to stop Container container_1452019681933_0002_01_04when > stopping NMClientImpl > 10:15:25,079 ERROR org.apache.hadoop.yarn.client.api.impl.NMClientImpl >- Failed to stop Container container_1452019681933_0002_01_02when > stopping NMClientImpl > 10:15:25,080 INFO > org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - > Closing proxy : cdh544-worker-0.c.astral-sorter-757.internal:8041 > 10:15:25,080 INFO > org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - > Closing proxy : cdh544-worker-1.c.astral-sorter-757.internal:8041 > 10:15:25,080 INFO > org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - > Closing proxy : cdh544-master.c.astral-sorter-757.internal:8041 > 10:15:25,080 INFO > org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - > Closing proxy : cdh544-worker-4.c.astral-sorter-757.internal:8041 > 10:15:25,081 INFO > org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - > Closing proxy : cdh544-worker-2.c.astral-sorter-757.internal:8041 > 10:15:25,081 INFO > org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - > Closing proxy : cdh544-worker-3.c.astral-sorter-757.internal:8041 > 10:15:25,081 INFO > org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - > Closing proxy : cdh544-worker-5.c.astral-sorter-757.internal:8041 > 10:15:25,085 INFO org.apache.flink.yarn.YarnJobManager >- Stopping J
[jira] [Updated] (FLINK-3204) TaskManagers are not shutting down properly on YARN
[ https://issues.apache.org/jira/browse/FLINK-3204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-3204: -- Labels: test-stability (was: ) > TaskManagers are not shutting down properly on YARN > --- > > Key: FLINK-3204 > URL: https://issues.apache.org/jira/browse/FLINK-3204 > Project: Flink > Issue Type: Bug > Components: YARN Client >Affects Versions: 1.0.0, 1.1.0 >Reporter: Robert Metzger > Labels: test-stability > > While running some experiments on a YARN cluster, I saw the following error > {code} > 10:15:24,741 INFO org.apache.flink.yarn.YarnJobManager >- Stopping YARN JobManager with status SUCCEEDED and diagnostic Flink YARN > Client requested shutdown. > 10:15:24,748 INFO org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl >- Waiting for application to be successfully unregistered. > 10:15:24,852 INFO > org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl - > Interrupted while waiting for queue > java.lang.InterruptedException > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2017) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2052) > at > java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) > at > org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:275) > 10:15:24,875 ERROR org.apache.hadoop.yarn.client.api.impl.NMClientImpl >- Failed to stop Container container_1452019681933_0002_01_10when > stopping NMClientImpl > 10:15:24,899 ERROR org.apache.hadoop.yarn.client.api.impl.NMClientImpl >- Failed to stop Container container_1452019681933_0002_01_07when > stopping NMClientImpl > 10:15:24,954 ERROR org.apache.hadoop.yarn.client.api.impl.NMClientImpl >- Failed to stop Container container_1452019681933_0002_01_06when > stopping NMClientImpl > 10:15:24,982 ERROR org.apache.hadoop.yarn.client.api.impl.NMClientImpl >- Failed to stop Container container_1452019681933_0002_01_09when > stopping NMClientImpl > 10:15:25,013 ERROR org.apache.hadoop.yarn.client.api.impl.NMClientImpl >- Failed to stop Container container_1452019681933_0002_01_11when > stopping NMClientImpl > 10:15:25,037 ERROR org.apache.hadoop.yarn.client.api.impl.NMClientImpl >- Failed to stop Container container_1452019681933_0002_01_08when > stopping NMClientImpl > 10:15:25,041 ERROR org.apache.hadoop.yarn.client.api.impl.NMClientImpl >- Failed to stop Container container_1452019681933_0002_01_12when > stopping NMClientImpl > 10:15:25,072 ERROR org.apache.hadoop.yarn.client.api.impl.NMClientImpl >- Failed to stop Container container_1452019681933_0002_01_05when > stopping NMClientImpl > 10:15:25,075 ERROR org.apache.hadoop.yarn.client.api.impl.NMClientImpl >- Failed to stop Container container_1452019681933_0002_01_03when > stopping NMClientImpl > 10:15:25,077 ERROR org.apache.hadoop.yarn.client.api.impl.NMClientImpl >- Failed to stop Container container_1452019681933_0002_01_04when > stopping NMClientImpl > 10:15:25,079 ERROR org.apache.hadoop.yarn.client.api.impl.NMClientImpl >- Failed to stop Container container_1452019681933_0002_01_02when > stopping NMClientImpl > 10:15:25,080 INFO > org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - > Closing proxy : cdh544-worker-0.c.astral-sorter-757.internal:8041 > 10:15:25,080 INFO > org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - > Closing proxy : cdh544-worker-1.c.astral-sorter-757.internal:8041 > 10:15:25,080 INFO > org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - > Closing proxy : cdh544-master.c.astral-sorter-757.internal:8041 > 10:15:25,080 INFO > org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - > Closing proxy : cdh544-worker-4.c.astral-sorter-757.internal:8041 > 10:15:25,081 INFO > org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - > Closing proxy : cdh544-worker-2.c.astral-sorter-757.internal:8041 > 10:15:25,081 INFO > org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - > Closing proxy : cdh544-worker-3.c.astral-sorter-757.internal:8041 > 10:15:25,081 INFO > org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - > Closing proxy : cdh544-worker-5.c.astral-sorter-757.internal:8041 > 10:15:25,085 INFO org.apache.flink.yarn.YarnJobManager >- Sto
[jira] [Commented] (FLINK-3204) TaskManagers are not shutting down properly on YARN
[ https://issues.apache.org/jira/browse/FLINK-3204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15379516#comment-15379516 ] Robert Metzger commented on FLINK-3204: --- I think this error still persists and also shows up in travis tests: https://s3.amazonaws.com/archive.travis-ci.org/jobs/144954182/log.txt (From the /yarn-tests/container_1468587486405_0005_01_01/jobmanager.log file in https://s3.amazonaws.com/flink-logs-us/travis-artifacts/rmetzger/flink/1600/1600.5.tar.gz) {code} 2016-07-15 12:59:46,808 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Shutting down cluster with status SUCCEEDED : Flink YARN Client requested shutdown 2016-07-15 12:59:46,809 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Unregistering application from the YARN Resource Manager 2016-07-15 12:59:46,817 INFO org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl - Waiting for application to be successfully unregistered. 2016-07-15 12:59:46,832 INFO org.apache.flink.yarn.YarnJobManager - Stopping JobManager akka.tcp://flink@172.17.3.43:49869/user/jobmanager. 2016-07-15 12:59:46,846 INFO org.apache.flink.runtime.blob.BlobServer - Stopped BLOB server at 0.0.0.0:50790 2016-07-15 12:59:46,864 ERROR org.apache.flink.yarn.YarnJobManager - Executor could not execute task java.util.concurrent.RejectedExecutionException at scala.concurrent.forkjoin.ForkJoinPool.fullExternalPush(ForkJoinPool.java:1870) at scala.concurrent.forkjoin.ForkJoinPool.externalPush(ForkJoinPool.java:1834) at scala.concurrent.forkjoin.ForkJoinPool.execute(ForkJoinPool.java:2955) at scala.concurrent.impl.ExecutionContextImpl.execute(ExecutionContextImpl.scala:107) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267) at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:89) at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:937) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2016-07-15 12:59:46,920 INFO org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl - Interrupted while waiting for queue java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2017) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2052) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:275) 2016-07-15 12:59:46,932 INFO org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - Closing proxy : testing-worker-linux-docker-99c17e61-3364-linux-5.prod.travis-ci.org:38889 2016-07-15 12:59:46,987 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon. 2016-07-15 12:59:46,987 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; proceeding with flushing remote transports. 2016-07-15 12:59:47,037 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting shut down. {code} > TaskManagers are not shutting down properly on YARN > --- > > Key: FLINK-3204 > URL: https://issues.apache.org/jira/browse/FLINK-3204 > Project: Flink > Issue Type: Bug > Components: YARN Client >Affects Versions: 1.0.0, 1.1.0 >Reporter: Robert Metzger > Labels: test-stability > > While running some experiments on a YARN cluster, I saw the following error > {code} > 10:15:24,741
[jira] [Updated] (FLINK-4676) Merge flink-batch-connectors and flink-streaming-connectors modules
[ https://issues.apache.org/jira/browse/FLINK-4676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4676: -- Component/s: Streaming Connectors Batch Connectors and Input/Output Formats > Merge flink-batch-connectors and flink-streaming-connectors modules > --- > > Key: FLINK-4676 > URL: https://issues.apache.org/jira/browse/FLINK-4676 > Project: Flink > Issue Type: Task > Components: Batch Connectors and Input/Output Formats, Build System, > Streaming Connectors >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Priority: Minor > Fix For: 1.2.0 > > > We have two separate Maven modules for batch and streaming connectors > (flink-batch-connectors and flink-streaming-connectors) that contain modules > for the individual external systems and storage formats such as HBase, > Cassandra, Avro, Elasticsearch, etc. > Some of these systems can be used in streaming as well as batch jobs as for > instance HBase, Cassandra, and Elasticsearch. > However, due to the separate main modules for streaming and batch connectors, > we currently need to decide where to put a connector. > For example, the flink-connector-cassandra module is located in > flink-streaming-connectors but includes a CassandraInputFormat and > CassandraOutputFormat (i.e., a batch source and sink). > This issue is about merging flink-batch-connectors and > flink-streaming-connectors into a joint flink-connectors module. > Names of moved modules should not be changed (although this leads to an > inconsistent naming scheme: flink-connector-cassandra vs. flink-hbase) to > keep the change of code structure transparent to users. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4786) BarrierBufferTest test instability
Robert Metzger created FLINK-4786: - Summary: BarrierBufferTest test instability Key: FLINK-4786 URL: https://issues.apache.org/jira/browse/FLINK-4786 Project: Flink Issue Type: Bug Affects Versions: 1.2.0 Reporter: Robert Metzger {code} Failed tests: BarrierBufferTest.testMultiChannelSkippingCheckpointsViaBlockedInputs:765->checkNoTempFilesRemain:949 barrier buffer did not clean up temp files. remaining file: baee85c650f0d1fe1fa6760329621d6ffc17d3f78b4d5b9c1009144ec438ca5b.1.buffer BarrierBufferTest.testMultiChannelWithBarriers:306->checkNoTempFilesRemain:949 barrier buffer did not clean up temp files. remaining file: baee85c650f0d1fe1fa6760329621d6ffc17d3f78b4d5b9c1009144ec438ca5b.1.buffer BarrierBufferTest.testMultiChannelWithQueuedFutureBarriers:445->validateAlignmentTime:957 wrong alignment time {code} in https://s3.amazonaws.com/archive.travis-ci.org/jobs/165528849/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Issue Comment Deleted] (FLINK-4155) Get Kafka producer partition info in open method instead of constructor
[ https://issues.apache.org/jira/browse/FLINK-4155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4155: -- Comment: was deleted (was: This issue has been resolved with FLINK-4379.) > Get Kafka producer partition info in open method instead of constructor > --- > > Key: FLINK-4155 > URL: https://issues.apache.org/jira/browse/FLINK-4155 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.0, 1.0.3 >Reporter: Gyula Fora >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0 > > > Currently the Flink Kafka producer does not really do any error handling if > something is wrong with the partition metadata as it is serialized with the > user function. > This means that in some cases the job can go into an error loop when using > the checkpoints. Getting the partition info in the open method would solve > this problem (like restarting from a savepoint which re-runs the constructor). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4155) Get Kafka producer partition info in open method instead of constructor
[ https://issues.apache.org/jira/browse/FLINK-4155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15562207#comment-15562207 ] Robert Metzger commented on FLINK-4155: --- This issue has been resolved with FLINK-4379. > Get Kafka producer partition info in open method instead of constructor > --- > > Key: FLINK-4155 > URL: https://issues.apache.org/jira/browse/FLINK-4155 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.0, 1.0.3 >Reporter: Gyula Fora >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0 > > > Currently the Flink Kafka producer does not really do any error handling if > something is wrong with the partition metadata as it is serialized with the > user function. > This means that in some cases the job can go into an error loop when using > the checkpoints. Getting the partition info in the open method would solve > this problem (like restarting from a savepoint which re-runs the constructor). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4023) Move Kafka consumer partition discovery from constructor to open()
[ https://issues.apache.org/jira/browse/FLINK-4023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15562211#comment-15562211 ] Robert Metzger commented on FLINK-4023: --- This has been resolved in FLINK-4379. > Move Kafka consumer partition discovery from constructor to open() > -- > > Key: FLINK-4023 > URL: https://issues.apache.org/jira/browse/FLINK-4023 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Michal Harish >Assignee: Tzu-Li (Gordon) Tai >Priority: Minor > Labels: kafka, kafka-0.8 > Fix For: 1.2.0 > > > Currently, Flink queries Kafka for partition information when creating the > Kafka consumer. This is done on the client when submitting the Flink job, > which requires the client to be able to fetch the partition data from Kafka > which may only be accessible from the cluster environment where the tasks > will be running. Moving the partition discovery to the open() method should > solve this problem. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4023) Move Kafka consumer partition discovery from constructor to open()
[ https://issues.apache.org/jira/browse/FLINK-4023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-4023. --- Resolution: Fixed Assignee: Stefan Richter (was: Tzu-Li (Gordon) Tai) > Move Kafka consumer partition discovery from constructor to open() > -- > > Key: FLINK-4023 > URL: https://issues.apache.org/jira/browse/FLINK-4023 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Michal Harish >Assignee: Stefan Richter >Priority: Minor > Labels: kafka, kafka-0.8 > Fix For: 1.2.0 > > > Currently, Flink queries Kafka for partition information when creating the > Kafka consumer. This is done on the client when submitting the Flink job, > which requires the client to be able to fetch the partition data from Kafka > which may only be accessible from the cluster environment where the tasks > will be running. Moving the partition discovery to the open() method should > solve this problem. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)