[jira] [Commented] (FLINK-11127) Make metrics query service establish connection to JobManager
[ https://issues.apache.org/jira/browse/FLINK-11127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16724737#comment-16724737 ] Nagarjun Guraja commented on FLINK-11127: - [~spoganshev] I am wondering, modifying the docker entrypoint script to first configure *taskmanager.host* with pod ip and then invoke taskmanager.sh should also do the trick instead of using init container right? Do you see any issue with that approach other than getting the workaround to docker image as opposed to handling externally? ** > Make metrics query service establish connection to JobManager > - > > Key: FLINK-11127 > URL: https://issues.apache.org/jira/browse/FLINK-11127 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, Kubernetes, Metrics >Affects Versions: 1.7.0 >Reporter: Ufuk Celebi >Priority: Major > > As part of FLINK-10247, the internal metrics query service has been separated > into its own actor system. Before this change, the JobManager (JM) queried > TaskManager (TM) metrics via the TM actor. Now, the JM needs to establish a > separate connection to the TM metrics query service actor. > In the context of Kubernetes, this is problematic as the JM will typically > *not* be able to resolve the TMs by name, resulting in warnings as follows: > {code} > 2018-12-11 08:32:33,962 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink-metrics@flink-task-manager-64b868487c-x9l4b:39183] has > failed, address is now gated for [50] ms. Reason: [Association failed with > [akka.tcp://flink-metrics@flink-task-manager-64b868487c-x9l4b:39183]] Caused > by: [flink-task-manager-64b868487c-x9l4b: Name does not resolve] > {code} > In order to expose the TMs by name in Kubernetes, users require a service > *for each* TM instance which is not practical. > This currently results in the web UI not being to display some basic metrics > about number of sent records. You can reproduce this by following the READMEs > in {{flink-container/kubernetes}}. > This worked before, because the JM is typically exposed via a service with a > known name and the TMs establish the connection to it which the metrics query > service piggybacked on. > A potential solution to this might be to let the query service connect to the > JM similar to how the TMs register. > I tagged this ticket as an improvement, but in the context of Kubernetes I > would consider this to be a bug. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8570) Provide a hook in Flink KafkaConsumer(source function) implementation to override assignment of kafka partitions to individual task nodes
[ https://issues.apache.org/jira/browse/FLINK-8570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nagarjun Guraja updated FLINK-8570: --- Description: This hook can be leveraged in coassigning and managing the association between KeyGroups and Kafka Partitions. This is required to exploit the use cases where streams are pre-partitioned on Kafka layer > Provide a hook in Flink KafkaConsumer(source function) implementation to > override assignment of kafka partitions to individual task nodes > - > > Key: FLINK-8570 > URL: https://issues.apache.org/jira/browse/FLINK-8570 > Project: Flink > Issue Type: Improvement >Reporter: Nagarjun Guraja >Priority: Major > > This hook can be leveraged in coassigning and managing the association > between KeyGroups and Kafka Partitions. This is required to exploit the use > cases where streams are pre-partitioned on Kafka layer -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8571) Provide an enhanced KeyedStream implementation to use ForwardPartitioner
Nagarjun Guraja created FLINK-8571: -- Summary: Provide an enhanced KeyedStream implementation to use ForwardPartitioner Key: FLINK-8571 URL: https://issues.apache.org/jira/browse/FLINK-8571 Project: Flink Issue Type: Improvement Reporter: Nagarjun Guraja This enhancement would help in modeling problems with pre partitioned input sources(for e.g. Kafka with Keyed topics). This would help in making the job graph embarrassingly parallel while leveraging rocksdb state backend and also the fine grained recovery semantics. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8570) Provide a hook in Flink KafkaConsumer(source function) implementation to override assignment of kafka partitions to individual task nodes
Nagarjun Guraja created FLINK-8570: -- Summary: Provide a hook in Flink KafkaConsumer(source function) implementation to override assignment of kafka partitions to individual task nodes Key: FLINK-8570 URL: https://issues.apache.org/jira/browse/FLINK-8570 Project: Flink Issue Type: Improvement Reporter: Nagarjun Guraja -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8569) Provide a hook to override the default KeyGroupRangeAssignment
Nagarjun Guraja created FLINK-8569: -- Summary: Provide a hook to override the default KeyGroupRangeAssignment Key: FLINK-8569 URL: https://issues.apache.org/jira/browse/FLINK-8569 Project: Flink Issue Type: Improvement Reporter: Nagarjun Guraja Currently the class 'org.apache.flink.runtime.state.KeyGroupRangeAssignment' has static methods(not pluggable) which is a little prohibitive and unintuitive to onboard some of the keyed embarrassingly parallel jobs. It would be helpful if a hook per job is provided to make this pluggable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4650) Frequent task manager disconnects from JobManager
[ https://issues.apache.org/jira/browse/FLINK-4650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15530607#comment-15530607 ] Nagarjun Guraja commented on FLINK-4650: Hey [~uce]... We talked about this briefly in our meeting with Stephan today.. We are planning to pull in the latest master fork sometime soon and redeploy one of our jobs and monitor this! We will reach out as soon as possible when we see this behavior on the new build. Let's keep this jira open until then! > Frequent task manager disconnects from JobManager > - > > Key: FLINK-4650 > URL: https://issues.apache.org/jira/browse/FLINK-4650 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, Network >Reporter: Nagarjun Guraja > > Not sure of the exact reason but we observe more frequent task manager > disconnects while using 1.2 snapshot build as compared to 1.1.2 release build -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3409) Integrate STOP with Savepoints
[ https://issues.apache.org/jira/browse/FLINK-3409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15530225#comment-15530225 ] Nagarjun Guraja commented on FLINK-3409: First version could be: Flink stops auto checkpoints and make sure that there are no further checkpoints after the savepoint is taken and stop/cancel the job > Integrate STOP with Savepoints > -- > > Key: FLINK-3409 > URL: https://issues.apache.org/jira/browse/FLINK-3409 > Project: Flink > Issue Type: Improvement > Components: Client, JobManager >Reporter: Matthias J. Sax > > Right now, if a savepoint is triggered the job keeps running. At the same > time, on STOP a last checkpoint should be collected for a clean shutdown > (WIP: https://issues.apache.org/jira/browse/FLINK-3408). > This work should enable stopping a job and getting a final savepoint at the > same time. Thus, a job can get stopped gracefully and resumed later on using > the gathered checkpoint. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4650) Frequent task manager disconnects from JobManager
[ https://issues.apache.org/jira/browse/FLINK-4650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15511311#comment-15511311 ] Nagarjun Guraja commented on FLINK-4650: [~StephanEwen] I haven't spent lot of time debugging it on 1.2.SNAPSHOT, but the stack traces are similar to the one below: (The node was reachable and no issues with network connectivity) org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager 'titus-248496-worker-0-2/100.82.8.187:56858'. This might indicate that the remote task manager was lost. at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:118) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223) at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223) at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:294) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223) at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:829) at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:610) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at java.lang.Thread.run(Thread.java:745) Do you want us to look for any specific log messages to see what was the root cause? > Frequent task manager disconnects from JobManager > - > > Key: FLINK-4650 > URL: https://issues.apache.org/jira/browse/FLINK-4650 > Project: Flink > Issue Type: Bug >Reporter: Nagarjun Guraja > > Not sure of the exact reason but we observe more frequent task manager > disconnects while using 1.2 snapshot build as compared to 1.1.2 release build -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4650) Frequent task manager disconnects from JobManager
Nagarjun Guraja created FLINK-4650: -- Summary: Frequent task manager disconnects from JobManager Key: FLINK-4650 URL: https://issues.apache.org/jira/browse/FLINK-4650 Project: Flink Issue Type: Bug Reporter: Nagarjun Guraja Not sure of the exact reason but we observe more frequent task manager disconnects while using 1.2 snapshot build as compared to 1.1.2 release build -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4596) RESTART_STRATEGY is not really pluggable
[ https://issues.apache.org/jira/browse/FLINK-4596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nagarjun Guraja updated FLINK-4596: --- Description: Standalone cluster config accepts an implementation(class) as part of the yaml config file but that does not work either as cluster level restart strategy or streaming job level restart strategy CLUSTER LEVEL CAUSE: createRestartStrategyFactory converts configured value of strategyname to lowercase and searches for class name using lowercased string. JOB LEVEL CAUSE: Checkpointed streams have specific code to add fixeddelayrestartconfiguration if no RestartConfiguration is specified in the job env. Also, jobs cannot provide their own custom restart strategy implementation and are constrained to pick up one of the three restart strategies provided by flink. FIX: Do not lower case the strategy config value, support a new restartconfiguration to fallback to cluster level restart strategy and support jobs to provide custom implementation of the strategy class itself. was: CAUSE: createRestartStrategyFactory converts configured value of strategyname to lowercase and searches for class name using lowercased string. FIX: Do not lower case the strategy config value or just lowercase for the switch case alone > RESTART_STRATEGY is not really pluggable > > > Key: FLINK-4596 > URL: https://issues.apache.org/jira/browse/FLINK-4596 > Project: Flink > Issue Type: Bug >Reporter: Nagarjun Guraja > > Standalone cluster config accepts an implementation(class) as part of the > yaml config file but that does not work either as cluster level restart > strategy or streaming job level restart strategy > CLUSTER LEVEL CAUSE: createRestartStrategyFactory converts configured value > of strategyname to lowercase and searches for class name using lowercased > string. > JOB LEVEL CAUSE: Checkpointed streams have specific code to add > fixeddelayrestartconfiguration if no RestartConfiguration is specified in > the job env. Also, jobs cannot provide their own custom restart strategy > implementation and are constrained to pick up one of the three restart > strategies provided by flink. > FIX: Do not lower case the strategy config value, support a new > restartconfiguration to fallback to cluster level restart strategy and > support jobs to provide custom implementation of the strategy class itself. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4596) RESTART_STRATEGY is not really pluggable
[ https://issues.apache.org/jira/browse/FLINK-4596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nagarjun Guraja updated FLINK-4596: --- Summary: RESTART_STRATEGY is not really pluggable (was: Class not found exception when RESTART_STRATEGY is configured with fully qualified class name in the yaml) > RESTART_STRATEGY is not really pluggable > > > Key: FLINK-4596 > URL: https://issues.apache.org/jira/browse/FLINK-4596 > Project: Flink > Issue Type: Bug >Reporter: Nagarjun Guraja > > CAUSE: createRestartStrategyFactory converts configured value of strategyname > to lowercase and searches for class name using lowercased string. > FIX: Do not lower case the strategy config value or just lowercase for the > switch case alone -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4596) Class not found exception when RESTART_STRATEGY is configured with fully qualified class name in the yaml
Nagarjun Guraja created FLINK-4596: -- Summary: Class not found exception when RESTART_STRATEGY is configured with fully qualified class name in the yaml Key: FLINK-4596 URL: https://issues.apache.org/jira/browse/FLINK-4596 Project: Flink Issue Type: Bug Reporter: Nagarjun Guraja CAUSE: createRestartStrategyFactory converts configured value of strategyname to lowercase and searches for class name using lowercased string. FIX: Do not lower case the strategy config value or just lowercase for the switch case alone -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4515) org.apache.flink.runtime.jobmanager.JobInfo class is not backwards compatible with 1.1 released version
Nagarjun Guraja created FLINK-4515: -- Summary: org.apache.flink.runtime.jobmanager.JobInfo class is not backwards compatible with 1.1 released version Key: FLINK-4515 URL: https://issues.apache.org/jira/browse/FLINK-4515 Project: Flink Issue Type: Bug Reporter: Nagarjun Guraja Current org.apache.flink.runtime.jobmanager.JobInfo in the 1.2 trunk is not backwards compatible which breaks job recorvery while upgrading to latest flink build from 1.1 release 2016-08-26 13:39:56,618 INFO org.apache.flink.runtime.jobmanager.JobManager - Attempting to recover all jobs. 2016-08-26 13:39:57,225 ERROR org.apache.flink.runtime.jobmanager.JobManager - Fatal error: Failed to recover jobs. java.io.InvalidClassException: org.apache.flink.runtime.jobmanager.JobInfo; local class incompatible: stream classdesc serialVersionUID = 4102282956967236682, local class serialVersionUID = -2377916285980374169 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:58) at org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:35) at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraphs(ZooKeeperSubmittedJobGraphStore.java:173) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply$mcV$sp(JobManager.scala:543) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scala:539) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scala:539) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2.apply$mcV$sp(JobManager.scala:539) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2.apply(JobManager.scala:535) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2.apply(JobManager.scala:535) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- This message was sent by Atlassian JIRA (v6.3.4#6332)