Re: How can I handle backpressure with event time.

2017-11-05 Thread yunfan123
It seems can't support consume multi topics with different deserialization
schema.
I use protobuf, different topic mapping to different schema.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


ExecutionGraph not serializable

2017-11-05 Thread XiangWei Huang
Hi Flink users,
Flink Jobmanager throw a NotSerializableException when i used 
JobMasterGateway to get ExecutionGraph of a specific job with 
message RequestJob(jobID). Blow is the detail of Exception:


[ERROR] [akka.remote.EndpointWriter] - Transient association error (association 
remains live)
java.io.NotSerializableException: 
org.apache.flink.runtime.executiongraph.ExecutionGraph
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at 
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
at 
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
at 
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
at 
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
at 
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:874)
at akka.remote.EndpointWriter.writeSend(Endpoint.scala:769)
at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:744)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

So,is it a bug or the way to get job’s executionGraph is invalid.

Best,XiangWei



Kafka Consumer fetch-size/rate and Producer queue timeout

2017-11-05 Thread Ashish Pokharel
All,

I am starting to notice a strange behavior in a particular streaming app. I 
initially thought it was a Producer issue as I was seeing timeout exceptions 
(records expiring in queue. I did try to modify request.timeout.ms, linger.ms 
etc to help with the issue if it were caused by a sudden burst of data or 
something along those lines. However, what it caused the app to increase back 
pressure and made the slower and slower until that timeout is reached. With 
lower timeouts, app would actually raise exception and recover faster. I can 
tell it is not related to connectivity as other apps are running just fine 
around the same time frame connected to same brokers (we have at least 10 
streaming apps connected to same list of brokers) from the same data nodes. We 
have enabled Graphite Reporter in all of our applications. After deep diving 
into some of consumer and producer stats, I noticed that consumer fetch-rate 
drops tremendously while fetch-size grows exponentially BEFORE the producer 
actually start to show higher response-time and lower rates. Eventually, I 
noticed connection resets start to occur and connection counts go up 
momentarily. After which, things get back to normal. Data producer rates remain 
constant around that timeframe - we have Logstash producer sending data over. 
We checked both Logstash and Kafka metrics and they seem to be showing same 
pattern (sort of sin wave) throughout.

It seems to point to Kafka issue (perhaps some tuning between Flink App and 
Kafka) but wanted to check with the experts before I start knocking down Kafka 
Admin’s doors. Are there anything else I can look into. There are quite a few 
default stats in Graphite but those were the ones that made most sense. 

Thanks, Ashish

Re: Flink memory usage

2017-11-05 Thread Jürgen Thomann
Can you use wget (curl will work as well)? You can find the taskmanagers 
with wget -O - http://localhost:8081/taskmanagers
and wget -O - http://localhost:8081/taskmanagers/request> to see detailed jvm

memory stats. localhost:8081 is in my example the jobmanager.


On 04.11.2017 16:19, AndreaKinn wrote:

Anyway, If I understood how system metrics works (the results seems to be
showed in browser) I can't use it because my cluster is accessible only with
terminal via ssh



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


RE: Negative values using latency marker

2017-11-05 Thread Sofer, Tovi
Hi Nico, 

Actually the run below is on my local machine, and both Kafka and flink run on 
it.

Thanks and regards,
Tovi
-Original Message-
From: Nico Kruber [mailto:n...@data-artisans.com] 
Sent: יום ו 03 נובמבר 2017 15:22
To: user@flink.apache.org
Cc: Sofer, Tovi [ICG-IT] 
Subject: Re: Negative values using latency marker

Hi Tovi,
if I see this correctly, the LatencyMarker gets its initial timstamp during 
creation at the source and the latency is reported as a metric at a sink by 
comparing the initial timestamp with the current time.
If the clocks between the two machines involved diverge, e.g. the sinks clock 
falling behind, the difference may be negative.


Nico

On Thursday, 2 November 2017 17:58:51 CET Sofer, Tovi  wrote:
> Hi group,
> 
> Can someone maybe elaborate how can latency gauge shown by latency 
> marker be negative?
> 
> 2017-11-02 18:54:56,842 INFO
> com.citi.artemis.flink.reporters.ArtemisReporter - 
> [Flink-MetricRegistry-1] 
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.Sink: FinalSink.0.latency: {LatencySourceDescriptor{vertexID=1,
> subtaskIndex=0}={p99=-5.0, p50=-5.0, min=-5.0, max=-5.0, p95=-5.0, 
> mean=-5.0}, LatencySourceDescriptor{vertexID=1, 
> subtaskIndex=1}={p99=-5.0, p50=-5.0, min=-5.0, max=-5.0, p95=-5.0, 
> mean=-5.0}, LatencySourceDescriptor{vertexID=1, 
> subtaskIndex=2}={p99=-6.0, p50=-6.0, min=-6.0, max=-6.0, p95=-6.0, 
> mean=-6.0}, LatencySourceDescriptor{vertexID=1, 
> subtaskIndex=3}={p99=-6.0, p50=-6.0, min=-6.0, max=-6.0, p95=-6.0, 
> mean=-6.0}} 2017-11-02 18:54:56,843 INFO 
> com.citi.artemis.flink.reporters.ArtemisReporter - 
> [Flink-MetricRegistry-1] 
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.AverageE2ELatencyChecker.0.60SecWarmUpRecordsCounter: 2858446
> 2017-11-02 18:54:56,843 INFO
> com.citi.artemis.flink.reporters.ArtemisReporter - 
> [Flink-MetricRegistry-1] 
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.Source: fixTopicConsumerSource.3.numRecordsOut: 1954784 2017-11-02
> 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter - 
> [Flink-MetricRegistry-1] 
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.AverageE2ELatencyChecker.0.ActualRecordsCounter: 4962675 
> 2017-11-02
> 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter - 
> [Flink-MetricRegistry-1] 
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.AverageE2ELatencyChecker.0.AverageLatencyMs: 0.0753785 2017-11-02
> 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter - 
> [Flink-MetricRegistry-1] 
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.AverageE2ELatencyChecker.0.HighLatencyMsgPercentage: 0.5918576
> 2017-11-02 18:54:56,843 INFO
> com.citi.artemis.flink.reporters.ArtemisReporter - 
> [Flink-MetricRegistry-1] 
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.Source: fixTopicConsumerSource.0.numRecordsOutPerSecond:
> 12943.1167 2017-11-02 18:54:56,843 INFO 
> com.citi.artemis.flink.reporters.ArtemisReporter - 
> [Flink-MetricRegistry-1] 
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.AverageE2ELatencyChecker.0.numRecordsInPerSecond: 51751.4 
> 2017-11-02
> 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter - 
> [Flink-MetricRegistry-1] 
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.Source: fixTopicConsumerSource.3.numRecordsOutPerSecond: 12935.05
> 2017-11-02 18:54:56,843 INFO
> com.citi.artemis.flink.reporters.ArtemisReporter - 
> [Flink-MetricRegistry-1] 
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.Source: fixTopicConsumerSource.2.numRecordsOutPerSecond:
> 12946.9166 2017-11-02 18:54:56,843 INFO 
> com.citi.artemis.flink.reporters.ArtemisReporter - 
> [Flink-MetricRegistry-1] 
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.Source: fixTopicConsumerSource.1.numRecordsOutPerSecond:
> 12926.3168 2017-11-02 18:54:56,844 INFO 
> com.citi.artemis.flink.reporters.ArtemisReporter - 
> [Flink-MetricRegistry-1] 
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.AverageE2ELatencyChecker.0.LatencyHistogram: count:1 min:24753
> max:19199891 mean:77637.6484 stddev:341333.9414842662 p50:40752.0
> p75:49809.0 p95:190480.95 p98:539110.819994 p99:749224.889995
> p999:3817927.9259998496
> 
> Regards,
> Tovi



Re: JobManager web interface redirect strategy when running in HA

2017-11-05 Thread Jürgen Thomann
I think you can solve this already with a health check (health monitor 
in OpenStack?).
I'm currently using GET requests to / and if they don't reply with a  
200 code the LB
will not use them. Only the Leader answers with a 200 code whereas the 
others send
a redirect with 30x code which should ensure that the requests go always 
the leader.



On 01.11.2017 12:34, Chesnay Schepler wrote:
We intend to change the redirection behavior such that any jobmanager 
(leading or not) can
accept requests, and communicates internally with the leader. In this 
model you could setup
the flink.domain.tld to point to any jobmanager (or distribute 
requests among them).


Would this work for you?

I believe this is targeted for 1.5.

On 31.10.2017 13:55, mrooding wrote:

Hi

We're running 3 job managers in high availability cluster mode backed by
OpenStack/Openshift. We're currently exposing all 3 job managers using 3
different routes (flink-1.domain.tld, flink-2.domain.tld,
flink-3.domain.tld). When accessing the route for a job manager which 
isn't
the leader it automatically redirects the user to the host and port 
of the
leading job manager. From what I've seen in the source code the rpc 
address
and port are being used to redirect. Since the internal hostnames are 
not

accessible outside the cluster this obviously doesn't work.

The nicest solution would be a single route (flink.domain.tld) which 
would

correctly delegate requests to the leading job manager. The second best
solution would probably be the possibility to declare a public URL in 
the

flink configuration file.

I'd be more than happy to contribute to Flink and add support for 
this but

I'd love to hear your ideas about it.

Kind regards

Marc




--
Sent from: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/






--
Jürgen Thomann
Software Developer


InnoGames GmbH
Friesenstraße 13 - 20097 Hamburg - Germany
Tel +49 40 7889335-0

Managing Directors: Hendrik Klindworth, Michael Zillmer
VAT-ID: DE264068907 Amtsgericht Hamburg, HRB 108973

http://www.innogames.com – juergen.thom...@innogames.com