[jira] [Commented] (FLINK-19154) Application mode deletes HA data in case of suspended ZooKeeper connection
[ https://issues.apache.org/jira/browse/FLINK-19154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17221779#comment-17221779 ] Cristian commented on FLINK-19154: -- For the record, this does seem to fix the issue. We haven't been able to reproduce the problem after upgrading our test environment to this new version > Application mode deletes HA data in case of suspended ZooKeeper connection > -- > > Key: FLINK-19154 > URL: https://issues.apache.org/jira/browse/FLINK-19154 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission >Affects Versions: 1.12.0, 1.11.1 > Environment: Run a stand-alone cluster that runs a single job (if you > are familiar with the way Ververica Platform runs Flink jobs, we use a very > similar approach). It runs Flink 1.11.1 straight from the official docker > image. >Reporter: Husky Zeng >Assignee: Kostas Kloudas >Priority: Blocker > Labels: pull-request-available > Fix For: 1.12.0, 1.11.3 > > > A user reported that Flink's application mode deletes HA data in case of a > suspended ZooKeeper connection [1]. > The problem seems to be that the {{ApplicationDispatcherBootstrap}} class > produces an exception (that the request job can no longer be found because of > a lost ZooKeeper connection) which will be interpreted as a job failure. Due > to this interpretation, the cluster will be shut down with a terminal state > of FAILED which will cause the HA data to be cleaned up. The exact problem > occurs in the {{JobStatusPollingUtils.getJobResult}} which is called by > {{ApplicationDispatcherBootstrap.getJobResult()}}. > The above described behaviour can be found in this log [2]. > [1] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-metadata-deleted-by-Flink-after-ZK-connection-issues-td37937.html > [2] https://pastebin.com/raw/uH9KDU2L -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19154) Application mode deletes HA data in case of suspended ZooKeeper connection
[ https://issues.apache.org/jira/browse/FLINK-19154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17221652#comment-17221652 ] Cristian commented on FLINK-19154: -- When are you guys planning to release these changes? I tried using them (i.e. building a Flink docker image with these changes) but hit a wall: these changes are not backwards compatible. The changes to the `flink-clients` (moving the AbstractDispatcherBootstrap class) mean that not only I need to upgrade the Flink cluster but also re compile all my jobs against these new changes. Since these changes are not in Maven yet, I also need to publish the jars to my own repository, etc. What's more, since this is not backwards compatible, I guess it makes no sense to make it part of the 1.11 branch? > Application mode deletes HA data in case of suspended ZooKeeper connection > -- > > Key: FLINK-19154 > URL: https://issues.apache.org/jira/browse/FLINK-19154 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission >Affects Versions: 1.12.0, 1.11.1 > Environment: Run a stand-alone cluster that runs a single job (if you > are familiar with the way Ververica Platform runs Flink jobs, we use a very > similar approach). It runs Flink 1.11.1 straight from the official docker > image. >Reporter: Husky Zeng >Assignee: Kostas Kloudas >Priority: Blocker > Labels: pull-request-available > Fix For: 1.12.0, 1.11.3 > > > A user reported that Flink's application mode deletes HA data in case of a > suspended ZooKeeper connection [1]. > The problem seems to be that the {{ApplicationDispatcherBootstrap}} class > produces an exception (that the request job can no longer be found because of > a lost ZooKeeper connection) which will be interpreted as a job failure. Due > to this interpretation, the cluster will be shut down with a terminal state > of FAILED which will cause the HA data to be cleaned up. The exact problem > occurs in the {{JobStatusPollingUtils.getJobResult}} which is called by > {{ApplicationDispatcherBootstrap.getJobResult()}}. > The above described behaviour can be found in this log [2]. > [1] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-metadata-deleted-by-Flink-after-ZK-connection-issues-td37937.html > [2] https://pastebin.com/raw/uH9KDU2L -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19154) Application mode deletes HA data in case of suspended ZooKeeper connection
[ https://issues.apache.org/jira/browse/FLINK-19154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17211377#comment-17211377 ] Cristian commented on FLINK-19154: -- I failed to capture the logs from the pods where this didn't happen... but it happened again to almost ALL of our jobs. This is a really nasty bug with a huge impact on production systems. I really hope it gets the traction and attention it needs. I don't know why no one else has reported it (we barely run 70 Flink jobs, I'm sure there are companies running way more). > Application mode deletes HA data in case of suspended ZooKeeper connection > -- > > Key: FLINK-19154 > URL: https://issues.apache.org/jira/browse/FLINK-19154 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission >Affects Versions: 1.12.0, 1.11.1 > Environment: Run a stand-alone cluster that runs a single job (if you > are familiar with the way Ververica Platform runs Flink jobs, we use a very > similar approach). It runs Flink 1.11.1 straight from the official docker > image. >Reporter: Husky Zeng >Assignee: Kostas Kloudas >Priority: Blocker > Fix For: 1.12.0, 1.11.3 > > > A user reported that Flink's application mode deletes HA data in case of a > suspended ZooKeeper connection [1]. > The problem seems to be that the {{ApplicationDispatcherBootstrap}} class > produces an exception (that the request job can no longer be found because of > a lost ZooKeeper connection) which will be interpreted as a job failure. Due > to this interpretation, the cluster will be shut down with a terminal state > of FAILED which will cause the HA data to be cleaned up. The exact problem > occurs in the {{JobStatusPollingUtils.getJobResult}} which is called by > {{ApplicationDispatcherBootstrap.getJobResult()}}. > The above described behaviour can be found in this log [2]. > [1] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-metadata-deleted-by-Flink-after-ZK-connection-issues-td37937.html > [2] https://pastebin.com/raw/uH9KDU2L -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19154) Application mode deletes HA data in case of suspended ZooKeeper connection
[ https://issues.apache.org/jira/browse/FLINK-19154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17204168#comment-17204168 ] Cristian commented on FLINK-19154: -- Hello guys! Good timing. This happened again yesterday. And it happened around the time one of our zookeeper nodes restarted (typical kubernetes shuffling, so not a ZK issue). I would be super happy to provide more details. One interesting but challenging characteristic of this bug is that this only affected one of the jobs out of more than 40 we run. The other jobs just restarted, but their state was preserved. But for one of the jobs we were unlucky and the job manager wiped out the state out of ZK. Pretty much the same logs as stated in this ticket. > Application mode deletes HA data in case of suspended ZooKeeper connection > -- > > Key: FLINK-19154 > URL: https://issues.apache.org/jira/browse/FLINK-19154 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission >Affects Versions: 1.12.0, 1.11.1 > Environment: Run a stand-alone cluster that runs a single job (if you > are familiar with the way Ververica Platform runs Flink jobs, we use a very > similar approach). It runs Flink 1.11.1 straight from the official docker > image. >Reporter: Husky Zeng >Priority: Blocker > Fix For: 1.12.0, 1.11.3 > > > A user reported that Flink's application mode deletes HA data in case of a > suspended ZooKeeper connection [1]. > The problem seems to be that the {{ApplicationDispatcherBootstrap}} class > produces an exception (that the request job can no longer be found because of > a lost ZooKeeper connection) which will be interpreted as a job failure. Due > to this interpretation, the cluster will be shut down with a terminal state > of FAILED which will cause the HA data to be cleaned up. The exact problem > occurs in the {{JobStatusPollingUtils.getJobResult}} which is called by > {{ApplicationDispatcherBootstrap.getJobResult()}}. > The above described behaviour can be found in this log [2]. > [1] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-metadata-deleted-by-Flink-after-ZK-connection-issues-td37937.html > [2] https://pastebin.com/raw/uH9KDU2L -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11127) Make metrics query service establish connection to JobManager
[ https://issues.apache.org/jira/browse/FLINK-11127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16753461#comment-16753461 ] Cristian commented on FLINK-11127: -- [~ray365] so how does the JM know how to call that service? And how does it ensure that the metrics it receives are the right one for the TM it wants to get the metrics from? > Make metrics query service establish connection to JobManager > - > > Key: FLINK-11127 > URL: https://issues.apache.org/jira/browse/FLINK-11127 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, Kubernetes, Metrics >Affects Versions: 1.7.0 >Reporter: Ufuk Celebi >Priority: Major > > As part of FLINK-10247, the internal metrics query service has been separated > into its own actor system. Before this change, the JobManager (JM) queried > TaskManager (TM) metrics via the TM actor. Now, the JM needs to establish a > separate connection to the TM metrics query service actor. > In the context of Kubernetes, this is problematic as the JM will typically > *not* be able to resolve the TMs by name, resulting in warnings as follows: > {code} > 2018-12-11 08:32:33,962 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink-metrics@flink-task-manager-64b868487c-x9l4b:39183] has > failed, address is now gated for [50] ms. Reason: [Association failed with > [akka.tcp://flink-metrics@flink-task-manager-64b868487c-x9l4b:39183]] Caused > by: [flink-task-manager-64b868487c-x9l4b: Name does not resolve] > {code} > In order to expose the TMs by name in Kubernetes, users require a service > *for each* TM instance which is not practical. > This currently results in the web UI not being to display some basic metrics > about number of sent records. You can reproduce this by following the READMEs > in {{flink-container/kubernetes}}. > This worked before, because the JM is typically exposed via a service with a > known name and the TMs establish the connection to it which the metrics query > service piggybacked on. > A potential solution to this might be to let the query service connect to the > JM similar to how the TMs register. > I tagged this ticket as an improvement, but in the context of Kubernetes I > would consider this to be a bug. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11408) ContinuousProcessingTimeTrigger: NPE on clear() and state is lost on merge
Cristian created FLINK-11408: Summary: ContinuousProcessingTimeTrigger: NPE on clear() and state is lost on merge Key: FLINK-11408 URL: https://issues.apache.org/jira/browse/FLINK-11408 Project: Flink Issue Type: Bug Environment: Put both bugs in [https://github.com/casidiablo/flink-continuous-processing-trigger-bugs] This is running Flink 1.7.1 locally. Reporter: Cristian I was testing session windows using processing time and found a couple of problems with the ContinuousProcessingTimeTrigger. The first one is an NPE in the clear method: [https://github.com/casidiablo/flink-continuous-processing-trigger-bugs/blob/master/src/main/java/flink/bug/Bug1.java] The second one, which is most likely related and the root cause of the first one, is that the way the state is merged for windows that are merged somehow makes it so that the trigger gets confused and it stops triggering: [https://github.com/casidiablo/flink-continuous-processing-trigger-bugs/blob/master/src/main/java/flink/bug/Bug2.java] I managed to solve both of these by using a modified version of ContinuousProcessingTimeTrigger which does NOT call `ctx.mergePartitionedState(stateDesc);` in the onMerge method. This is what I understand it happens at the trigger level: * The first element in the stream sets an initial fire time (logic is in ContinuousProcessingTimeTrigger#onElement()), if there is no trigger set. * From then on, ContinuousProcessingTimeTrigger#onProcessingTime() is responsible for scheduling the next trigger. * When the windows are merged (in the case of session windows), somehow the clear and merge methods are called using the wrong window namespace (I think this is the root cause of the bug, but I'm not too familiar with that code). * Because the state is not cleared properly in the new window namespace, the previously scheduled trigger gets executed against the window that was cleared. * Moreover, the new window has the state of the previous window, which means that: ## onElement will NOT schedule a fire trigger ## onProcessingTime will never be called at all -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7786) JarActionHandler.tokenizeArguments removes all quotes from program arguments
[ https://issues.apache.org/jira/browse/FLINK-7786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423028#comment-16423028 ] Cristian commented on FLINK-7786: - It would be awesome if this were fixed/implemented. In our case, one of the parameters to the program might contain double quotes (e.g. --param "value = \"foo\" etc."), and with the current implementation in 1.4.2 as well as the one introduced in FLINK-7715, it is impossible to make this work. The way I'm planning to workaround this is to receive that argument as a Base64 string, which will surely work but it's a terrible bummer. > JarActionHandler.tokenizeArguments removes all quotes from program arguments > > > Key: FLINK-7786 > URL: https://issues.apache.org/jira/browse/FLINK-7786 > Project: Flink > Issue Type: Bug > Components: Job-Submission, JobManager >Affects Versions: 1.3.0 >Reporter: Brice Bingman >Priority: Minor > > I would like to send json as an argument to my program, but when submitting > it via the REST API, all the quotes in the json are gone, resulting in > invalid json. > The JarActionHandler.tokenizeArguments should only remove the leading and > trailing quotes of an argument, not all of the quotes. > Current workaround: Replace the quotes in json with an escape character and > replace them back in the run method. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4194) Implement isEndOfStream() for KinesisDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-4194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370997#comment-16370997 ] Cristian commented on FLINK-4194: - Hi guys. This is a feature we miss. Given the fact that it's not implemented and will likely not be implemented in the near future... how else could this be done right now? Let me explain. Right now I need to run flink applications that read from Kinesis for a specific period of time (say two days), and I'm trying to figure out a way for the Flink app to gracefully stop itself after that. Is there a way to achieve that right now from within the Flink app? My other option, which I'd like to avoid, is to periodically check for flink apps to kill from an external worker. > Implement isEndOfStream() for KinesisDeserializationSchema > -- > > Key: FLINK-4194 > URL: https://issues.apache.org/jira/browse/FLINK-4194 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Robert Metzger >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > > **Original JIRA title: KinesisDeserializationSchema.isEndOfStream() is never > called. The corresponding part of the code has been commented out with > reference to this JIRA.** > The Kinesis connector does not respect the > {{KinesisDeserializationSchema.isEndOfStream()}} method. > The purpose of this method is to stop consuming from a source, based on input > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295892#comment-16295892 ] Cristian commented on FLINK-8162: - I will try to test and push a PR soon. > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290737#comment-16290737 ] Cristian commented on FLINK-8162: - I'm running a fork of the kinesis connector that implements this. Should I PR this? Or is this not part of the roadmap at all? > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-8164) JobManager's archiving does not work on S3
[ https://issues.apache.org/jira/browse/FLINK-8164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cristian closed FLINK-8164. --- Resolution: Fixed Fix Version/s: 1.4.0 > JobManager's archiving does not work on S3 > -- > > Key: FLINK-8164 > URL: https://issues.apache.org/jira/browse/FLINK-8164 > Project: Flink > Issue Type: Bug > Components: History Server, JobManager >Affects Versions: 1.3.2 >Reporter: Cristian > Fix For: 1.4.0 > > > I'm trying to configure JobManager's archiving mechanism > (https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/historyserver.html) > to use S3 but I'm getting this: > {code} > 2017-11-28 19:11:09,751 WARN > org.apache.flink.runtime.jobmanager.MemoryArchivist - Failed to > create Path for Some(s3a://bucket/completed-jobs). Job will not be archived. > java.lang.IllegalArgumentException: No file system found with scheme s3, > referenced in file URI 's3://bucket/completed-jobs'. > at > org.apache.flink.runtime.jobmanager.MemoryArchivist.validateAndNormalizeUri(MemoryArchivist.scala:297) > at > org.apache.flink.runtime.jobmanager.MemoryArchivist.org$apache$flink$runtime$jobmanager$MemoryArchivist$$archiveJsonFiles(MemoryArchivist.scala:201) > at > org.apache.flink.runtime.jobmanager.MemoryArchivist$$anonfun$handleMessage$1.applyOrElse(MemoryArchivist.scala:107) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at > org.apache.flink.runtime.jobmanager.MemoryArchivist.aroundReceive(MemoryArchivist.scala:65) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} > Which is very weird since I'm able to write to S3 from within the job itself. > I have also tried using s3a instead to no avail. > This happens running Flink v1.3.2 on EMR. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8164) JobManager's archiving does not work on S3
[ https://issues.apache.org/jira/browse/FLINK-8164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16276801#comment-16276801 ] Cristian commented on FLINK-8164: - Thanks, Chesnay! If you have time... can you point me to where in the code this lives. I wasn't able to find it. > JobManager's archiving does not work on S3 > -- > > Key: FLINK-8164 > URL: https://issues.apache.org/jira/browse/FLINK-8164 > Project: Flink > Issue Type: Bug > Components: History Server, JobManager >Affects Versions: 1.3.2 >Reporter: Cristian > > I'm trying to configure JobManager's archiving mechanism > (https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/historyserver.html) > to use S3 but I'm getting this: > {code} > 2017-11-28 19:11:09,751 WARN > org.apache.flink.runtime.jobmanager.MemoryArchivist - Failed to > create Path for Some(s3a://bucket/completed-jobs). Job will not be archived. > java.lang.IllegalArgumentException: No file system found with scheme s3, > referenced in file URI 's3://bucket/completed-jobs'. > at > org.apache.flink.runtime.jobmanager.MemoryArchivist.validateAndNormalizeUri(MemoryArchivist.scala:297) > at > org.apache.flink.runtime.jobmanager.MemoryArchivist.org$apache$flink$runtime$jobmanager$MemoryArchivist$$archiveJsonFiles(MemoryArchivist.scala:201) > at > org.apache.flink.runtime.jobmanager.MemoryArchivist$$anonfun$handleMessage$1.applyOrElse(MemoryArchivist.scala:107) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at > org.apache.flink.runtime.jobmanager.MemoryArchivist.aroundReceive(MemoryArchivist.scala:65) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} > Which is very weird since I'm able to write to S3 from within the job itself. > I have also tried using s3a instead to no avail. > This happens running Flink v1.3.2 on EMR. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16273688#comment-16273688 ] Cristian commented on FLINK-8162: - Bump [~tzulitai] > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8164) JobManager's archiving does not work on S3
[ https://issues.apache.org/jira/browse/FLINK-8164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cristian updated FLINK-8164: Description: I'm trying to configure JobManager's archiving mechanism (https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/historyserver.html) to use S3 but I'm getting this: {code} 2017-11-28 19:11:09,751 WARN org.apache.flink.runtime.jobmanager.MemoryArchivist - Failed to create Path for Some(s3a://bucket/completed-jobs). Job will not be archived. java.lang.IllegalArgumentException: No file system found with scheme s3, referenced in file URI 's3://bucket/completed-jobs'. at org.apache.flink.runtime.jobmanager.MemoryArchivist.validateAndNormalizeUri(MemoryArchivist.scala:297) at org.apache.flink.runtime.jobmanager.MemoryArchivist.org$apache$flink$runtime$jobmanager$MemoryArchivist$$archiveJsonFiles(MemoryArchivist.scala:201) at org.apache.flink.runtime.jobmanager.MemoryArchivist$$anonfun$handleMessage$1.applyOrElse(MemoryArchivist.scala:107) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at org.apache.flink.runtime.jobmanager.MemoryArchivist.aroundReceive(MemoryArchivist.scala:65) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} Which is very weird since I'm able to write to S3 from within the job itself. I have also tried using s3a instead to no avail. This happens running Flink v1.3.2 on EMR. was: I'm trying to configure JobManager's archiving mechanism (https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/historyserver.html) to use S3 but I'm getting this: {code} 2017-11-28 19:11:09,751 WARN org.apache.flink.runtime.jobmanager.MemoryArchivist - Failed to create Path for Some(s3a://scopely-flink-dev/completed-jobs). Job will not be archived. java.lang.IllegalArgumentException: No file system found with scheme s3, referenced in file URI 's3://scopely-flink-dev/completed-jobs'. at org.apache.flink.runtime.jobmanager.MemoryArchivist.validateAndNormalizeUri(MemoryArchivist.scala:297) at org.apache.flink.runtime.jobmanager.MemoryArchivist.org$apache$flink$runtime$jobmanager$MemoryArchivist$$archiveJsonFiles(MemoryArchivist.scala:201) at org.apache.flink.runtime.jobmanager.MemoryArchivist$$anonfun$handleMessage$1.applyOrElse(MemoryArchivist.scala:107) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at org.apache.flink.runtime.jobmanager.MemoryArchivist.aroundReceive(MemoryArchivist.scala:65) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} Which is very weird since I'm able to write to S3 from within the job itself. I have also tried using s3a instead to no avail. This happens running
[jira] [Updated] (FLINK-8164) JobManager's archiving does not work on S3
[ https://issues.apache.org/jira/browse/FLINK-8164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cristian updated FLINK-8164: Description: I'm trying to configure JobManager's archiving mechanism (https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/historyserver.html) to use S3 but I'm getting this: {code} 2017-11-28 19:11:09,751 WARN org.apache.flink.runtime.jobmanager.MemoryArchivist - Failed to create Path for Some(s3a://scopely-flink-dev/completed-jobs). Job will not be archived. java.lang.IllegalArgumentException: No file system found with scheme s3, referenced in file URI 's3://scopely-flink-dev/completed-jobs'. at org.apache.flink.runtime.jobmanager.MemoryArchivist.validateAndNormalizeUri(MemoryArchivist.scala:297) at org.apache.flink.runtime.jobmanager.MemoryArchivist.org$apache$flink$runtime$jobmanager$MemoryArchivist$$archiveJsonFiles(MemoryArchivist.scala:201) at org.apache.flink.runtime.jobmanager.MemoryArchivist$$anonfun$handleMessage$1.applyOrElse(MemoryArchivist.scala:107) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at org.apache.flink.runtime.jobmanager.MemoryArchivist.aroundReceive(MemoryArchivist.scala:65) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} Which is very weird since I'm able to write to S3 from within the job itself. I have also tried using s3a instead to no avail. This happens running Flink v1.3.2 on EMR. was: I'm trying to configure JobManager's archiving mechanism (https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/historyserver.html) to use S3 but I'm getting this: {code} 2017-11-28 19:11:09,751 WARN org.apache.flink.runtime.jobmanager.MemoryArchivist - Failed to create Path for Some(s3a://scopely-flink-dev/completed-jobs). Job will not be archived. java.lang.IllegalArgumentException: No file system found with scheme s3a, referenced in file URI 's3a://scopely-flink-dev/completed-jobs'. at org.apache.flink.runtime.jobmanager.MemoryArchivist.validateAndNormalizeUri(MemoryArchivist.scala:297) at org.apache.flink.runtime.jobmanager.MemoryArchivist.org$apache$flink$runtime$jobmanager$MemoryArchivist$$archiveJsonFiles(MemoryArchivist.scala:201) at org.apache.flink.runtime.jobmanager.MemoryArchivist$$anonfun$handleMessage$1.applyOrElse(MemoryArchivist.scala:107) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at org.apache.flink.runtime.jobmanager.MemoryArchivist.aroundReceive(MemoryArchivist.scala:65) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} Which is very weird since I'm able to write to S3 from within the job itself. This happens running Flink v1.3.2 on EMR. >
[jira] [Created] (FLINK-8164) JobManager's archiving does not work on S3
Cristian created FLINK-8164: --- Summary: JobManager's archiving does not work on S3 Key: FLINK-8164 URL: https://issues.apache.org/jira/browse/FLINK-8164 Project: Flink Issue Type: Bug Components: JobManager Reporter: Cristian I'm trying to configure JobManager's archiving mechanism (https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/historyserver.html) to use S3 but I'm getting this: {code} 2017-11-28 19:11:09,751 WARN org.apache.flink.runtime.jobmanager.MemoryArchivist - Failed to create Path for Some(s3a://scopely-flink-dev/completed-jobs). Job will not be archived. java.lang.IllegalArgumentException: No file system found with scheme s3a, referenced in file URI 's3a://scopely-flink-dev/completed-jobs'. at org.apache.flink.runtime.jobmanager.MemoryArchivist.validateAndNormalizeUri(MemoryArchivist.scala:297) at org.apache.flink.runtime.jobmanager.MemoryArchivist.org$apache$flink$runtime$jobmanager$MemoryArchivist$$archiveJsonFiles(MemoryArchivist.scala:201) at org.apache.flink.runtime.jobmanager.MemoryArchivist$$anonfun$handleMessage$1.applyOrElse(MemoryArchivist.scala:107) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at org.apache.flink.runtime.jobmanager.MemoryArchivist.aroundReceive(MemoryArchivist.scala:65) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} Which is very weird since I'm able to write to S3 from within the job itself. This happens running Flink v1.3.2 on EMR. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cristian updated FLINK-8162: Description: When reading from Kinesis streams, one of the most valuable metrics is "MillisBehindLatest" (see https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). Flink should use its metrics mechanism to report this value as a gauge, tagging it with the shard id. was: When reading from Kinesis streams, one of the most valuable metrics is "MillisBehindLatest" ([see](https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201)). Flink should use its metrics mechanism to report this value as a gauge, tagging it with the shard id. > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
Cristian created FLINK-8162: --- Summary: Kinesis Connector to report millisBehindLatest metric Key: FLINK-8162 URL: https://issues.apache.org/jira/browse/FLINK-8162 Project: Flink Issue Type: Improvement Components: Kinesis Connector Reporter: Cristian Priority: Minor When reading from Kinesis streams, one of the most valuable metrics is "MillisBehindLatest" ([see](https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201)). Flink should use its metrics mechanism to report this value as a gauge, tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6294) BucketingSink throws NPE while cancelling job
[ https://issues.apache.org/jira/browse/FLINK-6294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261228#comment-16261228 ] Cristian commented on FLINK-6294: - This is happening to me on Flink 1.3.2. The line is different, but I guess it's the same problem: {code:java} 2017-11-21 16:55:16,276 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Error during disposal of stream operator. java.lang.NullPointerException at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.close(BucketingSink.java:423) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:126) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:429) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:334) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) {code} > BucketingSink throws NPE while cancelling job > - > > Key: FLINK-6294 > URL: https://issues.apache.org/jira/browse/FLINK-6294 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.2.0 >Reporter: Andrey > > Steps to reproduce: > * configure BucketingSink and run job > * cancel job from UI before processing any messages > * in logs: > {code} > 2017-04-11 10:14:54,681 INFO org.apache.flink.core.fs.FileSystem > - Ensuring all FileSystem streams are closed for Source: Custom > Source (1/2) [Source: Custom Source (1/2)] > 2017-04-11 10:14:54,881 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > Un-registering task and sending final execution state CANCELED to JobManager > for task Source: Custom Source (56d0c9ffe06dc3e4481e7ce530d9894f) > [flink-akka.actor.default-dispatcher-4] > 2017-04-11 10:14:56,584 ERROR > org.apache.flink.streaming.runtime.tasks.StreamTask - Error during > disposal of stream operator. [Flat Map -> Sink: Unnamed (2/2)] > java.lang.NullPointerException > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.close(BucketingSink.java:422) > at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8124) EventTimeTrigger (and other triggers) could have less specific generic types
Cristian created FLINK-8124: --- Summary: EventTimeTrigger (and other triggers) could have less specific generic types Key: FLINK-8124 URL: https://issues.apache.org/jira/browse/FLINK-8124 Project: Flink Issue Type: Improvement Components: Core Reporter: Cristian Priority: Minor When implementing custom WindowAssigners, it is possible to need different implementations of the {{Window}} class (other than {{TimeWindow}}). In such cases, it is not possible to use the existing triggers (e.g. {{EventTimeTrigger}}) because it extends from {{Trigger