Re: Issues running multiple Jobs using the same JAR
That solved it, thank you very much Kezhu :) On 28.02.21 16:12, Kezhu Wang wrote: Hi Morgan, You could check FLINK-11654, from its description, I think it is the problem you encountered. > We run multiple jobs on a cluster which write a lot to the same Kafka topic from identically named sinks. When EXACTLY_ONCE semantic is enabled for the KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go into a restart cycle. FLINK-11654: https://issues.apache.org/jira/browse/FLINK-11654 Best, Kezhu Wang On February 28, 2021 at 22:35:02, Morgan Geldenhuys (morgan.geldenh...@tu-berlin.de <mailto:morgan.geldenh...@tu-berlin.de>) wrote: Greetings all, I am having an issue instantiating multiple flink jobs uisng the same JAR in the same Flink native cluster (all 1.12.1). When processing events, the jobs fail with the following trace: org.apache.kafka.common.KafkaException: Cannotperform send because at least one previous transactional oridempotent request has failed with errors. at org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:356) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:926) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865) at org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.send(FlinkKafkaInternalProducer.java:133) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:915) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:187) at org.apache.flink.streaming.runtime.io <http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204) at org.apache.flink.streaming.runtime.io <http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174) at org.apache.flink.streaming.runtime.io <http://org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.base/java.lang.Thread.run(UnknownSource) Suppressed: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failedto send data to Kafka: Producerattempted an operation with an old epoch. Eitherthere is a newer producer with the same transactionalId, orthe producer's transaction has been expired by the broker. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:965) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783) at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762) at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585) ... 3more Causedby: org.apache.kafka.common.errors.ProducerFencedException: Producerattempted an operation with an old epoch. Eitherthere is a newer producer with the same transactionalId, orthe producer's transaction has been expired by the broker. Suppressed: java.lang.IllegalStateException: Pendingrecord count must be zero at thispoint: 1 at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1090) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:925) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(
Issues running multiple Jobs using the same JAR
Greetings all, I am having an issue instantiating multiple flink jobs uisng the same JAR in the same Flink native cluster (all 1.12.1). When processing events, the jobs fail with the following trace: org.apache.kafka.common.KafkaException: Cannotperform send because at least one previous transactional oridempotent request has failed with errors. at org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:356) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:926) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865) at org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.send(FlinkKafkaInternalProducer.java:133) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:915) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:187) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.base/java.lang.Thread.run(UnknownSource) Suppressed: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failedto send data to Kafka: Producerattempted an operation with an old epoch. Eitherthere is a newer producer with the same transactionalId, orthe producer's transaction has been expired by the broker. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:965) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783) at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762) at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585) ... 3more Causedby: org.apache.kafka.common.errors.ProducerFencedException: Producerattempted an operation with an old epoch. Eitherthere is a newer producer with the same transactionalId, orthe producer's transaction has been expired by the broker. Suppressed: java.lang.IllegalStateException: Pendingrecord count must be zero at thispoint: 1 at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1090) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:925) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783) at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762) at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.base/java.lang.Thread.run(UnknownSource) Suppressed: java.lang.IllegalStateException: Pendingrecord count must be zero at thispoint: 1 at org
Request: Documentation for External Communication with Flink Cluster
Hi Community, I am interested in creating an external client for submitting and managing Flink jobs via a HTTP/REST endpoint. Taking a look at the documentation, external communication is possible with the Dispatcher and JobManager (https://ci.apache.org/projects/flink/flink-docs-stable/ops/security-ssl.html#external--rest-connectivity). That is great, however, where is the documentation for the REST api? Thanks in advance! Regards, M.
Flink on Kubernetes unable to Recover from failure
Community, I am currently doing some fault tolerance testing for Flink (1.10) running on Kubernetes (1.18) and am encountering an error where after a running job experiences a failure, the job fails completely. A Flink session cluster has been created according to the documentation contained here: https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html. The job is then uploaded and deployed via the web interface and everything runs smoothly. The job has a parallelism of 24 with 3 worker nodes as fail overs in reserve. Each worker is assigned 1 task slot each (total of 27). The next step would be inject an error for which I use the Pumba Chaos Testing tool (https://github.com/alexei-led/pumba) to pause a random worker process. This selection and pausing is done manually for the moment. Looking at the error logs, Flink does detect the error after the timeout (The heartbeat timeout has been set to 20 seconds): java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id 768848f91ebdbccc8d518e910160414d timed out. After the failure has been detected, the system resets to the latest saved checkpoint and restarts. The system catches up nicely and resumes normal processing... however, after about 3 minutes, the following error occurs: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager '/10.45.128.1:6121'. This might indicate that the remote task manager was lost. The job fails, and is unable to restart because the number of task slots has been reduced to zero. Looking at the kubernetes cluster, all containers are running... Has anyone else run into this error? What am I missing? The same thing happens when the containers are deleted. Regards, M.
A Strategy for Capacity Testing
Community, I am interested in knowing what is the recommended way of capacity planning a particular Flink application with current resource allocation. Taking a look at the Flink documentation (https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#capacity-planning), extra resources need to be allocated on top of what has already been assigned for normal operations for when failures occur. The amount of extra resources will determine how quickly the application can catch-up to the head of the input stream, e.g. kafka, considering event time processing. So, as far as i know the recommended way of testing the maximum capacity of the system is to slowly increase the ingestion rate to find the point just before backpressure would kick in. Would a strategy of starting the job at an earlier timestamp far enough in the past so that the system is forced to catch-up for a few minutes, and then take an average measurement of the ingress rate over this time be a sufficient strategy for determining the maximum number of messages that can be processed? Thank you in advance! Have a great day! Regards, M.
Failure detection and Heartbeats
Hi community, I am interested in knowing more about the failure detection mechanism used by Flink, unfortunately information is a little thin on the ground and I was hoping someone could shed a little light on the topic. Looking at the documentation (https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html), there are these two configuration options: heartbeat.interval 1 LongTime interval for requesting heartbeat from sender side. heartbeat.timeout 5 Long Timeout for requesting and receiving heartbeat for both sender and receiver sides. This would indicate Flink uses a heartbeat mechanism to ascertain the liveness of TaskManagers. From this the following assumptions are made: The JobManager is responsible for broadcasting a heartbeat requests to all TaskManagers and awaits responses. If a response is not forthcoming from any particular node within the heartbeat timeout period, e.g. 50 seconds by default, then that node is timed out and assumed to have failed. The heartbeat interval indicated how often the heartbeat request broadcast is scheduled. Having the heartbeat interval shorter than the heartbeat timeout would mean that multiple requests can be underway at the same time. Therefore, the TaskManager would need to fail to respond to 4 requests (assuming normal response times are lower than 10 seconds) before being timed out after 50 seconds. So therefore if a failure were to occur (considering the default settings): - In the best case the JobManager would detect the failure in the shortest time, i.e. 50 seconds +- (node fails just before receiving the next heartbeat request) - In the worst case the JobManager would detect the failure in the longest time, i.e. 60 seconds +- (node fails just after sending the last heartbeat response) Is this correct? For JobManagers in HA mode, this is left to ZooKeeper timeouts which then initiates a round of elections and the new leader picks up from the previous checkpoint. Thank you in advance. Regards, M.
Re: How to determine average utilization before backpressure kicks in?
Hi Roman, Thank you for the reply. Yes, I am aware that backpressure can be the result of many factors and yes this is an oversimplification of something very complex, please bare with me. Lets assume that this has been taken into account and has lowered the threshold for when this status permanently comes into effect, i.e. HIGH. Example: The system is running along perfectly fine under normal conditions, accessing external sources, and processing at an average of 100,000 messages/sec. Lets assume the maximum capacity is around 130,000 message/sec before back pressure starts propagating messages back up the stream. Therefore, utilization is at 0.76 (100K/130K). Great, but at present we dont know that 130,000 is the limit. For this example or for any job, is there a way of finding this maximum capacity (and hence the utilization) without pushing the system to its limit based on the current throughput? Possibly by measuring (as you say) the saturation of certain buffers (looking into this now, however, i am not too familiar with flink internals)? It doesn't have to be extremely precise. Any hints would be greatly appreciated. Regards, M. On 25.02.20 13:34, Khachatryan Roman wrote: Hi Morgan, Regarding backpressure, it can be caused by a number of factors, e.g. writing to an external system or slow input partitions. However, if you know that a particular resource is a bottleneck then it makes sense to monitor its saturation. It can be done by using Flink metrics. Please see the documentation for more details: https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html Regards, Roman On Tue, Feb 25, 2020 at 12:33 PM Morgan Geldenhuys <mailto:morgan.geldenh...@tu-berlin.de>> wrote: Hello community, I am fairly new to Flink and have a question concerning utilization. I was hoping someone could help. Knowing that backpressure is essentially the point at which utilization has reached 100% for any particular streaming pipeline and means that the application cannot "keep up" with the messages coming into the system. I was wondering, assuming a fairly stable input throughput, is there a way of determining the average utilization as a percentage? Can we determine how much more capacity each operator has before backpressure kicks in from metrics alone, i.e. 60% of capacity for example? Knowing that the maximum throughput of the DSP application is dictated by the slowest part of the pipeline, we would need to identify the slowest operator and then average horizontally. The only method that I can see of determining the point at which the system cannot keep up any longer is by scaling the input throughput slowly until the backpressure HIGH alarm is shown and hence the number of messages/sec is known. Yes I know this is a gross oversimplification and there are many many factors that need to be taken into account when dealing with backpressure, but it would be nice to have a general indicator, a rough estimate is fine. Thank you in advance. Regards, M.
How to determine average utilization before backpressure kicks in?
Hello community, I am fairly new to Flink and have a question concerning utilization. I was hoping someone could help. Knowing that backpressure is essentially the point at which utilization has reached 100% for any particular streaming pipeline and means that the application cannot "keep up" with the messages coming into the system. I was wondering, assuming a fairly stable input throughput, is there a way of determining the average utilization as a percentage? Can we determine how much more capacity each operator has before backpressure kicks in from metrics alone, i.e. 60% of capacity for example? Knowing that the maximum throughput of the DSP application is dictated by the slowest part of the pipeline, we would need to identify the slowest operator and then average horizontally. The only method that I can see of determining the point at which the system cannot keep up any longer is by scaling the input throughput slowly until the backpressure HIGH alarm is shown and hence the number of messages/sec is known. Yes I know this is a gross oversimplification and there are many many factors that need to be taken into account when dealing with backpressure, but it would be nice to have a general indicator, a rough estimate is fine. Thank you in advance. Regards, M.
Identifying Flink Operators of the Latency Metric
Hi All, I have setup monitoring for Flink (1.9.2) via Prometheus and am interested in viewing the end-to-end latency at the sink operators for the 95 percentile. I have enabled latency markers at the operator level and can see the results, one of the entries looks as follows: flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency{app="flink",component="taskmanager",host="flink_taskmanager_6bdc8fc49_kr4bs",instance="10.244.18.2:",job="kubernetes-pods",job_id="96d32d8e380dc267bd69403fd7e20adf",job_name="Traffic",kubernetes_namespace="default",kubernetes_pod_name="flink-taskmanager-6bdc8fc49-kr4bs",operator_id="2e32dc82f03b1df764824a4773219c97",operator_subtask_index="7",pod_template_hash="6bdc8fc49",quantile="0.95",source_id="cbc357ccb763df2852fee8c4fc7d55f2",tm_id="7fb02c0ed734ed1815fa51373457434f"} That is great, however... I am unable to determine which of the operators is the sink operator I'm looking for based solely on the operator_id. Is there a way of determining this? Regards, M.
Re: Question: Determining Total Recovery Time
Thanks for the advice, i will look into it. Had a quick think about another simple solution but we would need a hook into the checkpoint process from the task/operator perspective, which I haven't looked into yet. It would work like this: - The sink operators (?) would keep a local copy of the last message processed (or digest?), the current timestamp, and a boolean value indicating whether or not the system is in recovery or not. - While not in recovery, update the local copy and timestamp with each new event processed. - When a failure is detected and the taskmanagers are notified to rollback, we use the hook into this process to switch the boolean value to true. - While true, it compares each new message with the last one processed before the recovery process was initiated. - When a match is found, the difference between the previous and current timestamp is calculated and outputted as a custom metric and the boolean is reset to false. From here, the mean total recovery time could be calculated across the operators. Not sure how it would impact on performance, but i doubt it would be significant. We would need to ensure exactly once so that the message would be guaranteed to be seen again. thoughts? On 11.02.20 08:57, Arvid Heise wrote: Hi Morgan, as Timo pointed out, there is no general solution, but in your setting, you could look at the consumer lag of the input topic after a crash. Lag would spike until all tasks restarted and reprocessing begins. Offsets are only committed on checkpoints though by default. Best, Arvid On Tue, Feb 4, 2020 at 12:32 PM Timo Walther <mailto:twal...@apache.org>> wrote: Hi Morgan, as far as I know this is not possible mostly because measuring "till the point when the system catches up to the last message" is very pipeline/connector dependent. Some sources might need to read from the very beginning, some just continue from the latest checkpointed offset. Measure things like that (e.g. for experiments) might require collecting own metrics as part of your pipeline definition. Regards, Timo On 03.02.20 12:20, Morgan Geldenhuys wrote: > Community, > > I am interested in determining the total time to recover for a Flink > application after experiencing a partial failure. Let's assume a > pipeline consisting of Kafka -> Flink -> Kafka with Exactly-Once > guarantees enabled. > > Taking a look at the documentation > (https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html), > one of the metrics which can be gathered is /recoveryTime/. However, as > far as I can tell this is only the time taken for the system to go from > an inconsistent state back into a consistent state, i.e. restarting the > job. Is there any way of measuring the amount of time taken from the > point when the failure occurred till the point when the system catches > up to the last message that was processed before the outage? > > Thank you very much in advance! > > Regards, > Morgan.
Re: Flink TaskManager Logs polluted with InfluxDB metrics reporter errors
The only difference that i can tell is the Kubernetes cluster was upgraded from 1.14 to 1.17, however I rolled this back to test and the same result on the older version. Ive created a stripped down Job which produces the errors: // create a testing sink private static class CollectSinkimplements SinkFunction { // must be static public static final Listvalues =new ArrayList<>(); @Override public synchronized void invoke(String value)throws Exception { values.add(value); } } public static void main(String[] args)throws Exception { Properties props = Resources.GET.read("iot_traffic_processor.properties", Properties.class); // setup Kafka consumer Properties kafkaConsumerProps =new Properties(); kafkaConsumerProps.setProperty("bootstrap.servers", props.getProperty("kafka.brokerList"));kafkaConsumerProps.setProperty("group.id", props.getProperty("kafka.group"));FlinkKafkaConsumer myConsumer = new FlinkKafkaConsumer<>( props.getProperty("kafka.consumerTopic"), new SimpleStringSchema(), kafkaConsumerProps); // set up streaming execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setAutoWatermarkInterval(1000L); // create direct kafka stream DataStream trafficEventStream = env.addSource(myConsumer); DataStream mapStream = trafficEventStream .map(new RichMapFunction() { @Override public String map(String value)throws Exception { return value; } }); mapStream.addSink(new CollectSink()); env.execute("Traffic"); Dependencies have also been stripped down to a minimum: org.apache.flink flink-java 1.9.0 provided org.apache.flink flink-streaming-scala_2.12 1.9.0 provided org.apache.flink flink-streaming-java_2.12 1.9.0 provided org.apache.flink flink-table-uber_2.12 1.9.0 provided org.apache.flink flink-connector-kafka_2.12 1.9.0 org.apache.commons commons-lang3 3.9 log4j log4j 1.2.17 junit junit 4.12 On 06.02.20 10:58, Chesnay Schepler wrote: Setup-wise, are there any differences to what you had a few months ago? On 06/02/2020 10:40, Morgan Geldenhuys wrote: Further info, the flink cluster (1.9) is running on Kubernetes (1.17) with InfluxDB. I have tried the following images for InfluxDB: docker.io/influxdb:1.6.4 and influxdb:latest When going into the database and showing the series, there are really weird results: > show series key --- jobmanager_Status_JVM_CPU_Load,host=flink-jobmanager jobmanager_Status_JVM_CPU_Time,host=flink-jobmanager jobmanager_Status_JVM_ClassLoader_ClassesLoaded,host=flink-jobmanager jobmanager_Status_JVM_ClassLoader_ClassesUnloaded,host=flink-jobmanager jobmanager_Status_JVM_GarbageCollector_Copy_Count,host=flink-jobmanager jobmanager_Status_JVM_GarbageCollector_Copy_Time,host=flink-jobmanager jobmanager_Status_JVM_GarbageCollector_MarkSweepCompact_Count,host=flink-jobmanager jobmanager_Status_JVM_GarbageCollector_MarkSweepCompact_Time,host=flink-jobmanager jobmanager_Status_JVM_Memory_Direct_Count,host=flink-jobmanager jobmanager_Status_JVM_Memory_Direct_MemoryUsed,host=flink-jobmanager jobmanager_Status_JVM_Memory_Direct_TotalCapacity,host=flink-jobmanager jobmanager_Status_JVM_Memory_Heap_Committed,host=flink-jobmanager jobmanager_Status_JVM_Memory_Heap_Max,host=flink-jobmanager jobmanager_Status_JVM_Memory_Heap_Used,host=flink-jobmanager jobmanager_Status_JVM_Memory_Mapped_Count,host=flink-jobmanager jobmanager_Status_JVM_Memory_Mapped_MemoryUsed,host=flink-jobmanager jobmanager_Status_JVM_Memory_Mapped_TotalCapacity,host=flink-jobmanager jobmanager_Status_JVM_Memory_NonHeap_Committed,host=flink-jobmanager jobmanager_Status_JVM_Memory_NonHeap_Max,host=flink-jobmanager jobmanager_Status_JVM_Memory_NonHeap_Used,host=flink-jobmanager jobmanager_Status_JVM_Threads_Count,host=flink-jobmanager jobmanager_numRegisteredTaskManagers,host=flink-jobmanager jobmanager_numRunningJobs,host=flink-jobmanager jobmanager_taskSlotsAvailable,host=flink-jobmanager jobmanager_taskSlotsTotal,host=flink-jobmanager taskmanager_Status_JVM_CPU_Load,host=flink-taskmanager-6484bdf6c5-6v5q2,tm_id=4f62a2c1108e00ca0fdbcbb49f068717 taskmanager_Status_JVM_CPU_Time,host=flink-taskmanager-6484bdf6c5-6v5q2,tm_id=4f62a2c1108e00ca0fdbcbb49f068717 taskmanager_Status_JVM_ClassLoader_ClassesLoaded,host=flink-taskmanager-6484bdf6c5-6v5q2,tm_id=4f62a2c1108e00ca0fdbcbb49f068717 taskmanager_Status_JVM_ClassLoader_ClassesUnloaded,host=flink-taskmanager-6484bdf6c5-6v5q2,tm_id=4f62a2c1108e00ca0fdbcbb49f068717 taskmanager_Status_JVM_GarbageCollector_G1_Old_Generation_Count,host=flink-taskmanager-6484bdf6c5-6v5q2,tm_id=4f62a2c1108e00c
Re: Flink TaskManager Logs polluted with InfluxDB metrics reporter errors
717 taskmanager_Status_Shuffle_Netty_TotalMemorySegments,host=flink-taskmanager-6484bdf6c5-6v5q2,tm_id=4f62a2c1108e00ca0fdbcbb49f068717 As the job continues this list of series just gets longer and longer. It was working perfectly fine a few months ago, but no idea what is happening now. Any ideas? On 06.02.20 10:23, Chesnay Schepler wrote: What InfluxDB version are you using? On 05/02/2020 19:41, Morgan Geldenhuys wrote: I am trying to setup metrics reporting for Flink using InflixDB, however I am receiving tons of exceptions (listed right at the bottom). Reporting is setup as recommended by the documentation: metrics.reporter.influxdb.class: org.apache.flink.metrics.influxdb.InfluxdbReporter metrics.reporter.influxdb.host: influxdb metrics.reporter.influxdb.port: 8086 metrics.reporter.influxdb.db: flink metrics.reporter.influxdb.username: *** metrics.reporter.influxdb.password: *** Any hints at what would cause all these exceptions? 2020-02-0518:15:17,777WARNorg.apache.flink.runtime.metrics.MetricRegistryImpl- Errorwhilereporting metrics org.apache.flink.metrics.influxdb.shaded.org.influxdb.InfluxDBException$UnableToParseException: partial write: unable to parse 'taskmanager_job_task_operator_partition-revoked-latency-max,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ Custom\ Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 value=� 158092651772000': invalid boolean unable to parse 'taskmanager_job_task_operator_KafkaConsumer_heartbeat-response-time-max,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ Custom\ Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 value=� 158092651772000': invalid boolean unable to parse 'taskmanager_job_task_operator_KafkaConsumer_sync-time-max,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ Custom\ Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 value=� 158092651772000': invalid boolean unable to parse 'taskmanager_job_task_operator_KafkaConsumer_reauthentication-latency-avg,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ Custom\ Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 value=� 158092651772000': invalid boolean unable to parse 'taskmanager_job_task_operator_rebalance-latency-avg,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ Custom\ Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 value=� 158092651772000': invalid boolean unable to parse 'taskmanager_job_task_operator_reauthentication-latency-avg,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ Custom\ Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 value=� 158092651772000': invalid boolean unable to parse 'taskmanager_job_task_operator_KafkaConsumer_reauthentication-latency-max,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ Custom\ Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 value=� 158092651772000': invalid boolean unable to parse 'taskmanager_job_task_operator_partition-revoked-latency-avg,host=flin
Flink TaskManager Logs polluted with InfluxDB metrics reporter errors
I am trying to setup metrics reporting for Flink using InflixDB, however I am receiving tons of exceptions (listed right at the bottom). Reporting is setup as recommended by the documentation: metrics.reporter.influxdb.class: org.apache.flink.metrics.influxdb.InfluxdbReporter metrics.reporter.influxdb.host: influxdb metrics.reporter.influxdb.port: 8086 metrics.reporter.influxdb.db: flink metrics.reporter.influxdb.username: *** metrics.reporter.influxdb.password: *** Any hints at what would cause all these exceptions? 2020-02-0518:15:17,777WARNorg.apache.flink.runtime.metrics.MetricRegistryImpl- Errorwhilereporting metrics org.apache.flink.metrics.influxdb.shaded.org.influxdb.InfluxDBException$UnableToParseException: partial write: unable to parse 'taskmanager_job_task_operator_partition-revoked-latency-max,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ Custom\ Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 value=� 158092651772000': invalid boolean unable to parse 'taskmanager_job_task_operator_KafkaConsumer_heartbeat-response-time-max,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ Custom\ Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 value=� 158092651772000': invalid boolean unable to parse 'taskmanager_job_task_operator_KafkaConsumer_sync-time-max,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ Custom\ Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 value=� 158092651772000': invalid boolean unable to parse 'taskmanager_job_task_operator_KafkaConsumer_reauthentication-latency-avg,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ Custom\ Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 value=� 158092651772000': invalid boolean unable to parse 'taskmanager_job_task_operator_rebalance-latency-avg,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ Custom\ Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 value=� 158092651772000': invalid boolean unable to parse 'taskmanager_job_task_operator_reauthentication-latency-avg,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ Custom\ Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 value=� 158092651772000': invalid boolean unable to parse 'taskmanager_job_task_operator_KafkaConsumer_reauthentication-latency-max,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ Custom\ Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 value=� 158092651772000': invalid boolean unable to parse 'taskmanager_job_task_operator_partition-revoked-latency-avg,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ Custom\ Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 value=� 158092651772000': invalid boolean unable to parse 'taskmanager_job_task_operator_KafkaConsumer_join-time-avg,
Question: Determining Total Recovery Time
Community, I am interested in determining the total time to recover for a Flink application after experiencing a partial failure. Let's assume a pipeline consisting of Kafka -> Flink -> Kafka with Exactly-Once guarantees enabled. Taking a look at the documentation (https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html), one of the metrics which can be gathered is /recoveryTime/. However, as far as I can tell this is only the time taken for the system to go from an inconsistent state back into a consistent state, i.e. restarting the job. Is there any way of measuring the amount of time taken from the point when the failure occurred till the point when the system catches up to the last message that was processed before the outage? Thank you very much in advance! Regards, Morgan.