Re: How can I handle backpressure with event time.
It seems can't support consume multi topics with different deserialization schema. I use protobuf, different topic mapping to different schema. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
ExecutionGraph not serializable
Hi Flink users, Flink Jobmanager throw a NotSerializableException when i used JobMasterGateway to get ExecutionGraph of a specific job with message RequestJob(jobID). Blow is the detail of Exception: [ERROR] [akka.remote.EndpointWriter] - Transient association error (association remains live) java.io.NotSerializableException: org.apache.flink.runtime.executiongraph.ExecutionGraph at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129) at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129) at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129) at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36) at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875) at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:874) at akka.remote.EndpointWriter.writeSend(Endpoint.scala:769) at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:744) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) So,is it a bug or the way to get job’s executionGraph is invalid. Best,XiangWei
Kafka Consumer fetch-size/rate and Producer queue timeout
All, I am starting to notice a strange behavior in a particular streaming app. I initially thought it was a Producer issue as I was seeing timeout exceptions (records expiring in queue. I did try to modify request.timeout.ms, linger.ms etc to help with the issue if it were caused by a sudden burst of data or something along those lines. However, what it caused the app to increase back pressure and made the slower and slower until that timeout is reached. With lower timeouts, app would actually raise exception and recover faster. I can tell it is not related to connectivity as other apps are running just fine around the same time frame connected to same brokers (we have at least 10 streaming apps connected to same list of brokers) from the same data nodes. We have enabled Graphite Reporter in all of our applications. After deep diving into some of consumer and producer stats, I noticed that consumer fetch-rate drops tremendously while fetch-size grows exponentially BEFORE the producer actually start to show higher response-time and lower rates. Eventually, I noticed connection resets start to occur and connection counts go up momentarily. After which, things get back to normal. Data producer rates remain constant around that timeframe - we have Logstash producer sending data over. We checked both Logstash and Kafka metrics and they seem to be showing same pattern (sort of sin wave) throughout. It seems to point to Kafka issue (perhaps some tuning between Flink App and Kafka) but wanted to check with the experts before I start knocking down Kafka Admin’s doors. Are there anything else I can look into. There are quite a few default stats in Graphite but those were the ones that made most sense. Thanks, Ashish
Re: Flink memory usage
Can you use wget (curl will work as well)? You can find the taskmanagers with wget -O - http://localhost:8081/taskmanagers and wget -O - http://localhost:8081/taskmanagers/request> to see detailed jvm memory stats. localhost:8081 is in my example the jobmanager. On 04.11.2017 16:19, AndreaKinn wrote: Anyway, If I understood how system metrics works (the results seems to be showed in browser) I can't use it because my cluster is accessible only with terminal via ssh -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
RE: Negative values using latency marker
Hi Nico, Actually the run below is on my local machine, and both Kafka and flink run on it. Thanks and regards, Tovi -Original Message- From: Nico Kruber [mailto:n...@data-artisans.com] Sent: יום ו 03 נובמבר 2017 15:22 To: user@flink.apache.org Cc: Sofer, Tovi [ICG-IT] Subject: Re: Negative values using latency marker Hi Tovi, if I see this correctly, the LatencyMarker gets its initial timstamp during creation at the source and the latency is reported as a metric at a sink by comparing the initial timestamp with the current time. If the clocks between the two machines involved diverge, e.g. the sinks clock falling behind, the difference may be negative. Nico On Thursday, 2 November 2017 17:58:51 CET Sofer, Tovi wrote: > Hi group, > > Can someone maybe elaborate how can latency gauge shown by latency > marker be negative? > > 2017-11-02 18:54:56,842 INFO > com.citi.artemis.flink.reporters.ArtemisReporter - > [Flink-MetricRegistry-1] > 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming > Job.Sink: FinalSink.0.latency: {LatencySourceDescriptor{vertexID=1, > subtaskIndex=0}={p99=-5.0, p50=-5.0, min=-5.0, max=-5.0, p95=-5.0, > mean=-5.0}, LatencySourceDescriptor{vertexID=1, > subtaskIndex=1}={p99=-5.0, p50=-5.0, min=-5.0, max=-5.0, p95=-5.0, > mean=-5.0}, LatencySourceDescriptor{vertexID=1, > subtaskIndex=2}={p99=-6.0, p50=-6.0, min=-6.0, max=-6.0, p95=-6.0, > mean=-6.0}, LatencySourceDescriptor{vertexID=1, > subtaskIndex=3}={p99=-6.0, p50=-6.0, min=-6.0, max=-6.0, p95=-6.0, > mean=-6.0}} 2017-11-02 18:54:56,843 INFO > com.citi.artemis.flink.reporters.ArtemisReporter - > [Flink-MetricRegistry-1] > 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming > Job.AverageE2ELatencyChecker.0.60SecWarmUpRecordsCounter: 2858446 > 2017-11-02 18:54:56,843 INFO > com.citi.artemis.flink.reporters.ArtemisReporter - > [Flink-MetricRegistry-1] > 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming > Job.Source: fixTopicConsumerSource.3.numRecordsOut: 1954784 2017-11-02 > 18:54:56,843 INFO com.citi.artemis.flink.reporters.ArtemisReporter - > [Flink-MetricRegistry-1] > 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming > Job.AverageE2ELatencyChecker.0.ActualRecordsCounter: 4962675 > 2017-11-02 > 18:54:56,843 INFO com.citi.artemis.flink.reporters.ArtemisReporter - > [Flink-MetricRegistry-1] > 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming > Job.AverageE2ELatencyChecker.0.AverageLatencyMs: 0.0753785 2017-11-02 > 18:54:56,843 INFO com.citi.artemis.flink.reporters.ArtemisReporter - > [Flink-MetricRegistry-1] > 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming > Job.AverageE2ELatencyChecker.0.HighLatencyMsgPercentage: 0.5918576 > 2017-11-02 18:54:56,843 INFO > com.citi.artemis.flink.reporters.ArtemisReporter - > [Flink-MetricRegistry-1] > 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming > Job.Source: fixTopicConsumerSource.0.numRecordsOutPerSecond: > 12943.1167 2017-11-02 18:54:56,843 INFO > com.citi.artemis.flink.reporters.ArtemisReporter - > [Flink-MetricRegistry-1] > 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming > Job.AverageE2ELatencyChecker.0.numRecordsInPerSecond: 51751.4 > 2017-11-02 > 18:54:56,843 INFO com.citi.artemis.flink.reporters.ArtemisReporter - > [Flink-MetricRegistry-1] > 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming > Job.Source: fixTopicConsumerSource.3.numRecordsOutPerSecond: 12935.05 > 2017-11-02 18:54:56,843 INFO > com.citi.artemis.flink.reporters.ArtemisReporter - > [Flink-MetricRegistry-1] > 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming > Job.Source: fixTopicConsumerSource.2.numRecordsOutPerSecond: > 12946.9166 2017-11-02 18:54:56,843 INFO > com.citi.artemis.flink.reporters.ArtemisReporter - > [Flink-MetricRegistry-1] > 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming > Job.Source: fixTopicConsumerSource.1.numRecordsOutPerSecond: > 12926.3168 2017-11-02 18:54:56,844 INFO > com.citi.artemis.flink.reporters.ArtemisReporter - > [Flink-MetricRegistry-1] > 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming > Job.AverageE2ELatencyChecker.0.LatencyHistogram: count:1 min:24753 > max:19199891 mean:77637.6484 stddev:341333.9414842662 p50:40752.0 > p75:49809.0 p95:190480.95 p98:539110.819994 p99:749224.889995 > p999:3817927.9259998496 > > Regards, > Tovi
Re: JobManager web interface redirect strategy when running in HA
I think you can solve this already with a health check (health monitor in OpenStack?). I'm currently using GET requests to / and if they don't reply with a 200 code the LB will not use them. Only the Leader answers with a 200 code whereas the others send a redirect with 30x code which should ensure that the requests go always the leader. On 01.11.2017 12:34, Chesnay Schepler wrote: We intend to change the redirection behavior such that any jobmanager (leading or not) can accept requests, and communicates internally with the leader. In this model you could setup the flink.domain.tld to point to any jobmanager (or distribute requests among them). Would this work for you? I believe this is targeted for 1.5. On 31.10.2017 13:55, mrooding wrote: Hi We're running 3 job managers in high availability cluster mode backed by OpenStack/Openshift. We're currently exposing all 3 job managers using 3 different routes (flink-1.domain.tld, flink-2.domain.tld, flink-3.domain.tld). When accessing the route for a job manager which isn't the leader it automatically redirects the user to the host and port of the leading job manager. From what I've seen in the source code the rpc address and port are being used to redirect. Since the internal hostnames are not accessible outside the cluster this obviously doesn't work. The nicest solution would be a single route (flink.domain.tld) which would correctly delegate requests to the leading job manager. The second best solution would probably be the possibility to declare a public URL in the flink configuration file. I'd be more than happy to contribute to Flink and add support for this but I'd love to hear your ideas about it. Kind regards Marc -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ -- Jürgen Thomann Software Developer InnoGames GmbH Friesenstraße 13 - 20097 Hamburg - Germany Tel +49 40 7889335-0 Managing Directors: Hendrik Klindworth, Michael Zillmer VAT-ID: DE264068907 Amtsgericht Hamburg, HRB 108973 http://www.innogames.com – juergen.thom...@innogames.com