Re: Issues running multiple Jobs using the same JAR

2021-03-01 Thread Morgan Geldenhuys

That solved it, thank you very much Kezhu :)

On 28.02.21 16:12, Kezhu Wang wrote:

Hi Morgan,

You could check FLINK-11654, from its description, I think it is the 
problem you encountered.


> We run multiple jobs on a cluster which write a lot to the same 
Kafka topic from identically named sinks. When EXACTLY_ONCE semantic 
is enabled for the KafkaProducers we run into a lot of 
ProducerFencedExceptions and all jobs go into a restart cycle.


FLINK-11654: https://issues.apache.org/jira/browse/FLINK-11654


Best,
Kezhu Wang


On February 28, 2021 at 22:35:02, Morgan Geldenhuys 
(morgan.geldenh...@tu-berlin.de 
<mailto:morgan.geldenh...@tu-berlin.de>) wrote:



Greetings all,

I am having an issue instantiating multiple flink jobs uisng the same 
JAR in the same Flink native cluster (all 1.12.1).


When processing events, the jobs fail with the following trace:

org.apache.kafka.common.KafkaException: Cannotperform send because at 
least one previous transactional oridempotent request has failed with 
errors.
at 
org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:356)
at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:926)
at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
at 
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.send(FlinkKafkaInternalProducer.java:133)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:915)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:187)
at org.apache.flink.streaming.runtime.io 
<http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
at org.apache.flink.streaming.runtime.io 
<http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
at org.apache.flink.streaming.runtime.io 
<http://org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.base/java.lang.Thread.run(UnknownSource)
Suppressed: 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: 
Failedto send data to Kafka: Producerattempted an operation with an 
old epoch. Eitherthere is a newer producer with the same 
transactionalId, orthe producer's transaction has been expired by the 
broker.
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:965)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)

... 3more
Causedby: org.apache.kafka.common.errors.ProducerFencedException: 
Producerattempted an operation with an old epoch. Eitherthere is a 
newer producer with the same transactionalId, orthe producer's 
transaction has been expired by the broker.
Suppressed: java.lang.IllegalStateException: Pendingrecord count must 
be zero at thispoint: 1
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1090)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:925)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(

Issues running multiple Jobs using the same JAR

2021-02-28 Thread Morgan Geldenhuys

Greetings all,

I am having an issue instantiating multiple flink jobs uisng the same 
JAR in the same Flink native cluster (all 1.12.1).


When processing events, the jobs fail with the following trace:

org.apache.kafka.common.KafkaException: Cannotperform send because at 
least one previous transactional oridempotent request has failed with 
errors.
at 
org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:356)
at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:926)
at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
at 
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.send(FlinkKafkaInternalProducer.java:133)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:915)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:187)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.base/java.lang.Thread.run(UnknownSource)
Suppressed: 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: 
Failedto send data to Kafka: Producerattempted an operation with an old 
epoch. Eitherthere is a newer producer with the same transactionalId, 
orthe producer's transaction has been expired by the broker.
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:965)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)

... 3more
Causedby: org.apache.kafka.common.errors.ProducerFencedException: 
Producerattempted an operation with an old epoch. Eitherthere is a newer 
producer with the same transactionalId, orthe producer's transaction has 
been expired by the broker.
Suppressed: java.lang.IllegalStateException: Pendingrecord count must be 
zero at thispoint: 1
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1090)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:925)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.base/java.lang.Thread.run(UnknownSource)
Suppressed: java.lang.IllegalStateException: Pendingrecord count must be 
zero at thispoint: 1
at 
org

Request: Documentation for External Communication with Flink Cluster

2020-06-15 Thread Morgan Geldenhuys

Hi Community,

I am interested in creating an external client for submitting and 
managing Flink jobs via a HTTP/REST endpoint. Taking a look at the 
documentation, external communication is possible with the Dispatcher 
and JobManager 
(https://ci.apache.org/projects/flink/flink-docs-stable/ops/security-ssl.html#external--rest-connectivity). 
That is great, however, where is the documentation for the REST api?


Thanks in advance!

Regards,
M.


Flink on Kubernetes unable to Recover from failure

2020-05-05 Thread Morgan Geldenhuys


Community,

I am currently doing some fault tolerance testing for Flink (1.10) 
running on Kubernetes (1.18) and am encountering an error where after a 
running job experiences a failure, the job fails completely.


A Flink session cluster has been created according to the documentation 
contained here: 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html. 
The job is then uploaded and deployed via the web interface and 
everything runs smoothly. The job has a parallelism of 24 with 3 worker 
nodes as fail overs in reserve. Each worker is assigned 1 task slot each 
(total of 27).


The next step would be inject an error for which I use the Pumba Chaos 
Testing tool (https://github.com/alexei-led/pumba) to pause a random 
worker process. This selection and pausing is done manually for the moment.


Looking at the error logs, Flink does detect the error after the timeout 
(The heartbeat timeout has been set to 20 seconds):


java.util.concurrent.TimeoutException: The heartbeat of TaskManager with 
id 768848f91ebdbccc8d518e910160414d  timed out.


After the failure has been detected, the system resets to the latest 
saved checkpoint and restarts. The system catches up nicely and resumes 
normal processing... however, after about 3 minutes, the following error 
occurs:


org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connection unexpectedly closed by remote task manager 
'/10.45.128.1:6121'. This might indicate that the remote task manager 
was lost.


The job fails, and is unable to restart because the number of task slots 
has been reduced to zero. Looking at the kubernetes cluster, all 
containers are running...


Has anyone else run into this error? What am I missing? The same thing 
happens when the containers are deleted.


Regards,
M.









A Strategy for Capacity Testing

2020-04-23 Thread Morgan Geldenhuys

Community,

I am interested in knowing what is the recommended way of capacity 
planning a particular Flink application with current resource 
allocation. Taking a look at the Flink documentation 
(https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#capacity-planning), 
extra resources need to be allocated on top of what has already been 
assigned for normal operations for when failures occur. The amount of 
extra resources will determine how quickly the application can catch-up 
to the head of the input stream, e.g. kafka, considering event time 
processing.


So, as far as i know the recommended way of testing the maximum capacity 
of the system is to slowly increase the ingestion rate to find the point 
just before backpressure would kick in.


Would a strategy of starting the job at an earlier timestamp far enough 
in the past so that the system is forced to catch-up for a few minutes, 
and then take an average measurement of the ingress rate over this time 
be a sufficient strategy for determining the maximum number of messages 
that can be processed?


Thank you in advance! Have a great day!

Regards,
M.


Failure detection and Heartbeats

2020-03-10 Thread Morgan Geldenhuys

Hi community,

I am interested in knowing more about the failure detection mechanism 
used by Flink, unfortunately information is a little thin on the ground 
and I was hoping someone could shed a little light on the topic.


Looking at the documentation 
(https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html), 
there are these two configuration options:



 heartbeat.interval

1   LongTime interval for requesting heartbeat from sender side.


 heartbeat.timeout

	5 	Long 	Timeout for requesting and receiving heartbeat for both 
sender and receiver sides.


This would indicate Flink uses a heartbeat mechanism to ascertain the 
liveness of TaskManagers. From this the following assumptions are made:


The JobManager is responsible for broadcasting a heartbeat requests to 
all TaskManagers and awaits responses.
If a response is not forthcoming from any particular node within the 
heartbeat timeout period, e.g. 50 seconds by default, then that node is 
timed out and assumed to have failed.
The heartbeat interval indicated how often the heartbeat request 
broadcast is scheduled.
Having the heartbeat interval shorter than the heartbeat timeout would 
mean that multiple requests can be underway at the same time.
Therefore, the TaskManager would need to fail to respond to 4 requests 
(assuming normal response times are lower than 10 seconds) before being 
timed out after 50 seconds.


So therefore if a failure were to occur (considering the default settings):
- In the best case the JobManager would detect the failure in the 
shortest time, i.e. 50 seconds +- (node fails just before receiving the 
next heartbeat request)
- In the worst case the JobManager would detect the failure in the 
longest time, i.e. 60 seconds +- (node fails just after sending the last 
heartbeat response)


Is this correct?

For JobManagers in HA mode, this is left to ZooKeeper timeouts which 
then initiates a round of elections and the new leader picks up from the 
previous checkpoint.


Thank you in advance.

Regards,
M.









Re: How to determine average utilization before backpressure kicks in?

2020-02-25 Thread Morgan Geldenhuys

Hi Roman,

Thank you for the reply.

Yes, I am aware that backpressure can be the result of many factors and 
yes this is an oversimplification of something very complex, please bare 
with me. Lets assume that this has been taken into account and has 
lowered the threshold for when this status permanently comes into 
effect, i.e. HIGH.


Example: The system is running along perfectly fine under normal 
conditions, accessing external sources, and processing at an average of 
100,000 messages/sec. Lets assume the maximum capacity is around 130,000 
message/sec before back pressure starts propagating messages back up the 
stream. Therefore, utilization is at 0.76 (100K/130K). Great, but at 
present we dont know that 130,000 is the limit.


For this example or for any job, is there a way of finding this maximum 
capacity (and hence the utilization) without pushing the system to its 
limit based on the current throughput? Possibly by measuring (as you 
say) the saturation of certain buffers (looking into this now, however, 
i am not too familiar with flink internals)? It doesn't have to be 
extremely precise. Any hints would be greatly appreciated.


Regards,
M.

On 25.02.20 13:34, Khachatryan Roman wrote:

Hi Morgan,

Regarding backpressure, it can be caused by a number of factors, e.g. 
writing to an external system or slow input partitions.


However, if you know that a particular resource is a bottleneck then 
it makes sense to monitor its saturation.
It can be done by using Flink metrics. Please see the documentation 
for more details:

https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html

Regards,
Roman


On Tue, Feb 25, 2020 at 12:33 PM Morgan Geldenhuys 
<mailto:morgan.geldenh...@tu-berlin.de>> wrote:


Hello community,

I am fairly new to Flink and have a question concerning
utilization. I
was hoping someone could help.

Knowing that backpressure is essentially the point at which
utilization
has reached 100% for any particular streaming pipeline and means that
the application cannot "keep up" with the messages coming into the
system.

I was wondering, assuming a fairly stable input throughput, is
there a
way of determining the average utilization as a percentage? Can we
determine how much more capacity each operator has before
backpressure
kicks in from metrics alone, i.e. 60% of capacity for example?
Knowing
that the maximum throughput of the DSP application is dictated by the
slowest part of the pipeline, we would need to identify the slowest
operator and then average horizontally.

The only method that I can see of determining the point at which the
system cannot keep up any longer is by scaling the input throughput
slowly until the backpressure HIGH alarm is shown and hence the
number
of messages/sec is known.

Yes I know this is a gross oversimplification and there are many many
factors that need to be taken into account when dealing with
backpressure, but it would be nice to have a general indicator, a
rough
estimate is fine.

Thank you in advance.

Regards,
M.







How to determine average utilization before backpressure kicks in?

2020-02-25 Thread Morgan Geldenhuys

Hello community,

I am fairly new to Flink and have a question concerning utilization. I 
was hoping someone could help.


Knowing that backpressure is essentially the point at which utilization 
has reached 100% for any particular streaming pipeline and means that 
the application cannot "keep up" with the messages coming into the system.


I was wondering, assuming a fairly stable input throughput, is there a 
way of determining the average utilization as a percentage? Can we 
determine how much more capacity each operator has before backpressure 
kicks in from metrics alone, i.e. 60% of capacity for example? Knowing 
that the maximum throughput of the DSP application is dictated by the 
slowest part of the pipeline, we would need to identify the slowest 
operator and then average horizontally.


The only method that I can see of determining the point at which the 
system cannot keep up any longer is by scaling the input throughput 
slowly until the backpressure HIGH alarm is shown and hence the number 
of messages/sec is known.


Yes I know this is a gross oversimplification and there are many many 
factors that need to be taken into account when dealing with 
backpressure, but it would be nice to have a general indicator, a rough 
estimate is fine.


Thank you in advance.

Regards,
M.





Identifying Flink Operators of the Latency Metric

2020-02-18 Thread Morgan Geldenhuys

Hi All,

I have setup monitoring for Flink (1.9.2) via Prometheus and am 
interested in viewing the end-to-end latency at the sink operators for 
the 95 percentile. I have enabled latency markers at the operator level 
and can see the results, one of the entries looks as follows:


flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency{app="flink",component="taskmanager",host="flink_taskmanager_6bdc8fc49_kr4bs",instance="10.244.18.2:",job="kubernetes-pods",job_id="96d32d8e380dc267bd69403fd7e20adf",job_name="Traffic",kubernetes_namespace="default",kubernetes_pod_name="flink-taskmanager-6bdc8fc49-kr4bs",operator_id="2e32dc82f03b1df764824a4773219c97",operator_subtask_index="7",pod_template_hash="6bdc8fc49",quantile="0.95",source_id="cbc357ccb763df2852fee8c4fc7d55f2",tm_id="7fb02c0ed734ed1815fa51373457434f"}

That is great, however... I am unable to determine which of the 
operators is the sink operator I'm looking for based solely on the 
operator_id. Is there a way of determining this?


Regards,
M.


Re: Question: Determining Total Recovery Time

2020-02-11 Thread Morgan Geldenhuys

Thanks for the advice, i will look into it.

Had a quick think about another simple solution but we would need a hook 
into the checkpoint process from the task/operator perspective, which I 
haven't looked into yet. It would work like this:


- The sink operators (?) would keep a local copy of the last message 
processed (or digest?), the current timestamp, and a boolean value 
indicating whether or not the system is in recovery or not.
- While not in recovery, update the local copy and timestamp with each 
new event processed.
- When a failure is detected and the taskmanagers are notified to 
rollback, we use the hook into this process to switch the boolean value 
to true.
- While true, it compares each new message with the last one processed 
before the recovery process was initiated.
- When a match is found, the difference between the previous and current 
timestamp is calculated and outputted as a custom metric and the boolean 
is reset to false.


From here, the mean total recovery time could be calculated across the 
operators. Not sure how it would impact on performance, but i doubt it 
would be significant. We would need to ensure exactly once so that the 
message would be guaranteed to be seen again. thoughts?


On 11.02.20 08:57, Arvid Heise wrote:

Hi Morgan,

as Timo pointed out, there is no general solution, but in your 
setting, you could look at the consumer lag of the input topic after a 
crash. Lag would spike until all tasks restarted and reprocessing 
begins. Offsets are only committed on checkpoints though by default.


Best,

Arvid

On Tue, Feb 4, 2020 at 12:32 PM Timo Walther <mailto:twal...@apache.org>> wrote:


Hi Morgan,

as far as I know this is not possible mostly because measuring
"till the
point when the system catches up to the last message" is very
pipeline/connector dependent. Some sources might need to read from
the
very beginning, some just continue from the latest checkpointed
offset.

Measure things like that (e.g. for experiments) might require
collecting
own metrics as part of your pipeline definition.

Regards,
Timo


On 03.02.20 12:20, Morgan Geldenhuys wrote:
> Community,
>
> I am interested in determining the total time to recover for a
Flink
> application after experiencing a partial failure. Let's assume a
> pipeline consisting of Kafka -> Flink -> Kafka with Exactly-Once
> guarantees enabled.
>
> Taking a look at the documentation
>

(https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html),

> one of the metrics which can be gathered is /recoveryTime/.
However, as
> far as I can tell this is only the time taken for the system to
go from
> an inconsistent state back into a consistent state, i.e.
restarting the
> job. Is there any way of measuring the amount of time taken from
the
> point when the failure occurred till the point when the system
catches
> up to the last message that was processed before the outage?
>
> Thank you very much in advance!
>
> Regards,
> Morgan.





Re: Flink TaskManager Logs polluted with InfluxDB metrics reporter errors

2020-02-06 Thread Morgan Geldenhuys


The only difference that i can tell is the Kubernetes cluster was 
upgraded from 1.14 to 1.17, however I rolled this back to test and the 
same result on the older version.


Ive created a stripped down Job which produces the errors:

// create a testing sink private static class CollectSinkimplements 
SinkFunction {

// must be static public static final Listvalues =new ArrayList<>();

@Override public synchronized void invoke(String value)throws Exception {
values.add(value);
}
}

public static void main(String[] args)throws Exception {

Properties props = Resources.GET.read("iot_traffic_processor.properties", 
Properties.class);

// setup Kafka consumer Properties kafkaConsumerProps =new Properties();

kafkaConsumerProps.setProperty("bootstrap.servers", 
props.getProperty("kafka.brokerList"));kafkaConsumerProps.setProperty("group.id", 
props.getProperty("kafka.group"));FlinkKafkaConsumer myConsumer =
new FlinkKafkaConsumer<>(
props.getProperty("kafka.consumerTopic"),
new SimpleStringSchema(),
kafkaConsumerProps);


// set up streaming execution environment StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setAutoWatermarkInterval(1000L);

// create direct kafka stream DataStream trafficEventStream = 
env.addSource(myConsumer);

DataStream mapStream =
trafficEventStream
.map(new RichMapFunction() {
@Override public String map(String value)throws Exception {
return value;
}
});

mapStream.addSink(new CollectSink());

env.execute("Traffic");


Dependencies have also been stripped down to a minimum:

 
org.apache.flink
flink-java
1.9.0
provided


org.apache.flink
flink-streaming-scala_2.12
1.9.0
provided


org.apache.flink
flink-streaming-java_2.12
1.9.0
provided


org.apache.flink
flink-table-uber_2.12
1.9.0
provided


org.apache.flink
flink-connector-kafka_2.12
1.9.0


 
org.apache.commons
commons-lang3
3.9


log4j
log4j
1.2.17


junit
junit
4.12




On 06.02.20 10:58, Chesnay Schepler wrote:

Setup-wise, are there any differences to what you had a few months ago?

On 06/02/2020 10:40, Morgan Geldenhuys wrote:


Further info, the flink cluster (1.9) is running on Kubernetes (1.17) 
with InfluxDB.


I have tried the following images for InfluxDB: 
docker.io/influxdb:1.6.4 and influxdb:latest


When going into the database and showing the series, there are really 
weird results:


> show series
key
---
jobmanager_Status_JVM_CPU_Load,host=flink-jobmanager
jobmanager_Status_JVM_CPU_Time,host=flink-jobmanager
jobmanager_Status_JVM_ClassLoader_ClassesLoaded,host=flink-jobmanager
jobmanager_Status_JVM_ClassLoader_ClassesUnloaded,host=flink-jobmanager
jobmanager_Status_JVM_GarbageCollector_Copy_Count,host=flink-jobmanager
jobmanager_Status_JVM_GarbageCollector_Copy_Time,host=flink-jobmanager
jobmanager_Status_JVM_GarbageCollector_MarkSweepCompact_Count,host=flink-jobmanager
jobmanager_Status_JVM_GarbageCollector_MarkSweepCompact_Time,host=flink-jobmanager
jobmanager_Status_JVM_Memory_Direct_Count,host=flink-jobmanager
jobmanager_Status_JVM_Memory_Direct_MemoryUsed,host=flink-jobmanager
jobmanager_Status_JVM_Memory_Direct_TotalCapacity,host=flink-jobmanager
jobmanager_Status_JVM_Memory_Heap_Committed,host=flink-jobmanager
jobmanager_Status_JVM_Memory_Heap_Max,host=flink-jobmanager
jobmanager_Status_JVM_Memory_Heap_Used,host=flink-jobmanager
jobmanager_Status_JVM_Memory_Mapped_Count,host=flink-jobmanager
jobmanager_Status_JVM_Memory_Mapped_MemoryUsed,host=flink-jobmanager
jobmanager_Status_JVM_Memory_Mapped_TotalCapacity,host=flink-jobmanager
jobmanager_Status_JVM_Memory_NonHeap_Committed,host=flink-jobmanager
jobmanager_Status_JVM_Memory_NonHeap_Max,host=flink-jobmanager
jobmanager_Status_JVM_Memory_NonHeap_Used,host=flink-jobmanager
jobmanager_Status_JVM_Threads_Count,host=flink-jobmanager
jobmanager_numRegisteredTaskManagers,host=flink-jobmanager
jobmanager_numRunningJobs,host=flink-jobmanager
jobmanager_taskSlotsAvailable,host=flink-jobmanager
jobmanager_taskSlotsTotal,host=flink-jobmanager
taskmanager_Status_JVM_CPU_Load,host=flink-taskmanager-6484bdf6c5-6v5q2,tm_id=4f62a2c1108e00ca0fdbcbb49f068717
taskmanager_Status_JVM_CPU_Time,host=flink-taskmanager-6484bdf6c5-6v5q2,tm_id=4f62a2c1108e00ca0fdbcbb49f068717
taskmanager_Status_JVM_ClassLoader_ClassesLoaded,host=flink-taskmanager-6484bdf6c5-6v5q2,tm_id=4f62a2c1108e00ca0fdbcbb49f068717
taskmanager_Status_JVM_ClassLoader_ClassesUnloaded,host=flink-taskmanager-6484bdf6c5-6v5q2,tm_id=4f62a2c1108e00ca0fdbcbb49f068717
taskmanager_Status_JVM_GarbageCollector_G1_Old_Generation_Count,host=flink-taskmanager-6484bdf6c5-6v5q2,tm_id=4f62a2c1108e00c

Re: Flink TaskManager Logs polluted with InfluxDB metrics reporter errors

2020-02-06 Thread Morgan Geldenhuys
717
taskmanager_Status_Shuffle_Netty_TotalMemorySegments,host=flink-taskmanager-6484bdf6c5-6v5q2,tm_id=4f62a2c1108e00ca0fdbcbb49f068717

As the job continues this list of series just gets longer and longer.

It was working perfectly fine a few months ago, but no idea what is 
happening now. Any ideas?


On 06.02.20 10:23, Chesnay Schepler wrote:

What InfluxDB version are you using?

On 05/02/2020 19:41, Morgan Geldenhuys wrote:


I am trying to setup metrics reporting for Flink using InflixDB, 
however I am receiving tons of exceptions (listed right at the bottom).


Reporting is setup as recommended by the documentation:
metrics.reporter.influxdb.class: 
org.apache.flink.metrics.influxdb.InfluxdbReporter
metrics.reporter.influxdb.host: influxdb
metrics.reporter.influxdb.port: 8086
metrics.reporter.influxdb.db: flink
metrics.reporter.influxdb.username: ***
metrics.reporter.influxdb.password: ***
Any hints at what would cause all these exceptions?

2020-02-0518:15:17,777WARNorg.apache.flink.runtime.metrics.MetricRegistryImpl- 
Errorwhilereporting metrics
org.apache.flink.metrics.influxdb.shaded.org.influxdb.InfluxDBException$UnableToParseException: 
partial write: unable to parse 
'taskmanager_job_task_operator_partition-revoked-latency-max,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ 
Custom\ 
Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ 
Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 
value=� 158092651772000': invalid boolean
unable to parse 
'taskmanager_job_task_operator_KafkaConsumer_heartbeat-response-time-max,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ 
Custom\ 
Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ 
Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 
value=� 158092651772000': invalid boolean
unable to parse 
'taskmanager_job_task_operator_KafkaConsumer_sync-time-max,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ 
Custom\ 
Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ 
Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 
value=� 158092651772000': invalid boolean
unable to parse 
'taskmanager_job_task_operator_KafkaConsumer_reauthentication-latency-avg,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ 
Custom\ 
Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ 
Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 
value=� 158092651772000': invalid boolean
unable to parse 
'taskmanager_job_task_operator_rebalance-latency-avg,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ 
Custom\ 
Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ 
Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 
value=� 158092651772000': invalid boolean
unable to parse 
'taskmanager_job_task_operator_reauthentication-latency-avg,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ 
Custom\ 
Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ 
Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 
value=� 158092651772000': invalid boolean
unable to parse 
'taskmanager_job_task_operator_KafkaConsumer_reauthentication-latency-max,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ 
Custom\ 
Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ 
Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 
value=� 158092651772000': invalid boolean
unable to parse 
'taskmanager_job_task_operator_partition-revoked-latency-avg,host=flin

Flink TaskManager Logs polluted with InfluxDB metrics reporter errors

2020-02-05 Thread Morgan Geldenhuys


I am trying to setup metrics reporting for Flink using InflixDB, however 
I am receiving tons of exceptions (listed right at the bottom).


Reporting is setup as recommended by the documentation:

metrics.reporter.influxdb.class: 
org.apache.flink.metrics.influxdb.InfluxdbReporter
metrics.reporter.influxdb.host: influxdb
metrics.reporter.influxdb.port: 8086
metrics.reporter.influxdb.db: flink
metrics.reporter.influxdb.username: ***
metrics.reporter.influxdb.password: ***

Any hints at what would cause all these exceptions?

2020-02-0518:15:17,777WARNorg.apache.flink.runtime.metrics.MetricRegistryImpl- 
Errorwhilereporting metrics
org.apache.flink.metrics.influxdb.shaded.org.influxdb.InfluxDBException$UnableToParseException: 
partial write: unable to parse 
'taskmanager_job_task_operator_partition-revoked-latency-max,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ 
Custom\ 
Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ 
Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 
value=� 158092651772000': invalid boolean
unable to parse 
'taskmanager_job_task_operator_KafkaConsumer_heartbeat-response-time-max,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ 
Custom\ 
Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ 
Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 
value=� 158092651772000': invalid boolean
unable to parse 
'taskmanager_job_task_operator_KafkaConsumer_sync-time-max,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ 
Custom\ 
Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ 
Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 
value=� 158092651772000': invalid boolean
unable to parse 
'taskmanager_job_task_operator_KafkaConsumer_reauthentication-latency-avg,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ 
Custom\ 
Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ 
Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 
value=� 158092651772000': invalid boolean
unable to parse 
'taskmanager_job_task_operator_rebalance-latency-avg,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ 
Custom\ 
Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ 
Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 
value=� 158092651772000': invalid boolean
unable to parse 
'taskmanager_job_task_operator_reauthentication-latency-avg,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ 
Custom\ 
Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ 
Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 
value=� 158092651772000': invalid boolean
unable to parse 
'taskmanager_job_task_operator_KafkaConsumer_reauthentication-latency-max,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ 
Custom\ 
Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ 
Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 
value=� 158092651772000': invalid boolean
unable to parse 
'taskmanager_job_task_operator_partition-revoked-latency-avg,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ 
Custom\ 
Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ 
Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 
value=� 158092651772000': invalid boolean
unable to parse 
'taskmanager_job_task_operator_KafkaConsumer_join-time-avg,

Question: Determining Total Recovery Time

2020-02-03 Thread Morgan Geldenhuys

Community,

I am interested in determining the total time to recover for a Flink 
application after experiencing a partial failure. Let's assume a 
pipeline consisting of Kafka -> Flink -> Kafka with Exactly-Once 
guarantees enabled.


Taking a look at the documentation 
(https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html), 
one of the metrics which can be gathered is /recoveryTime/. However, as 
far as I can tell this is only the time taken for the system to go from 
an inconsistent state back into a consistent state, i.e. restarting the 
job. Is there any way of measuring the amount of time taken from the 
point when the failure occurred till the point when the system catches 
up to the last message that was processed before the outage?


Thank you very much in advance!

Regards,
Morgan.