Flink Job Failed With Kafka Exception
Hello Team, We are running the Flink pipeline by consuming data from multiple topics, but we recently encountered that if there's one topic having issues with participation, etc., the whole Flink pipeline is failing, which is affecting topics. Is there a way we can make Flink Piplein keep running even after one of the topics has an issue? We tried to handle exceptions to make sure the job wouldn't fail, but it didn't help out. Caused by: java.lang.RuntimeException: Failed to get metadata for topics Can you please provide any insights? Regards,Madan
Re: Metric to capture decoding failure in flink sources
Hi Prateek, I agree, the reader should ideally expose the context to record metrics about deserialization. One option is to defer deserialization to another operator, say a RichMapFunction that has access to the RuntimeContext. Best, Mason On Mon, Oct 9, 2023 at 12:42 PM Prateek Kohli wrote: > Hi, > > I need to get the difference between records which are collected by the > source and the records which are emitted. > For eg - If deserialization fails while reading records from kafka, in > that case I want to expose the difference between records collected from > Kafka Broker and records emitted from Kafka operator after deserialization > as a metric. > > But I think flink does not provide any such metrics. > > In Kafka Source I can have a workaround to get this metric: > > I can override the open method from KafkaRecordDeserializationSchema where > a metric can be added to show decoding failures: > > @Override > public void open(DeserializationSchema.InitializationContext context) > throws Exception { > context.getMetricGroup().gauge("decodingFailures", new Gauge() > { > @Override > public Integer getValue() > { > return decodingFailures; > } > }); > } > > and at the time of deserialization I can increment that counter as below: > > @Override > public void deserialize(ConsumerRecord record, > Collector out) > { > try > { > //deserialize > } > catch (IOException | MMException e) > { > logger.error(String.format("Error received while decoding, in > partition [%d] for topic [%s] at offset [%d]: %s", > partition, topic, offset, e.toString())); > > decodingFailures++; > } > > *But there is no such way to implement this in FileSource, as > SimpleStreamFormat/Reader does not provide access to Context in any way.* > > *Is there any way I can get this metric in both File & Kafka Collector or > any generic way to get this agnostic to what collector is being used?* > > Regards, > Prateek Kohli >
Running FlinkSQL locally
Hi, I am trying to run FlinkSQL (through sql-client.sh) and read messages from Kafka topics. I downloaded 1.17.1 and extracted it. For the Kafka connectivity, I've added flink-sql-connector-kafka-1.1.7.jar, flink-connector-kafka-1.1.7.jar and kafka-clients-2.8.1.jar and now start the sql-client-.sh script adding these three libs with the "-l" parameter. Now I'd like to create a table based on a Kafka topic like this: CREATE TABLE bla ( id BIGINT, from_account INT, to_account INT, amount DOUBLE, ts TIMESTAMP(3), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'bla', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'bla', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'true', 'json.ignore-parse-errors' = 'false' ); Now when I select * from bla, I get the following error message: [ERROR] Could not execute SQL statement. Reason: java.lang.ClassCastException: cannot assign instance of org.apache.kafka.clients.consumer.OffsetResetStrategy to field org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer.offsetResetStrategy of type org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetResetStrategy in instance of org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer I tried different Kafka client lib versions and also multiple flink(-sql)-connector-kafka versions with no avail :( Best, Ralph M. Debusmann
Announcing public preview of Flink with Azure HDInsight on AKS
We are super excited to announce the public preview of Flink on Azure, as a managed platform as a service. You can read more information on this blog and reach out to me! https://techcommunity.microsoft.com/t5/analytics-on-azure-blog/announcing-public-preview-of-apache-flink-with-azure-hdinsight/ba-p/3936611 Thank you Regards Ram
File Source Watermark Issue
Hi Team, I am using Flink File Source with window aggregator as process function, and stuck with a weird issues. File source doesn't seem emitting/progressing the watermarks, whereas if I put a delay (say 100ms) while extracting timestamp from event, it is working fine. A bit same thing I found in comments here https://stackoverflow.com/questions/68736330/flink-watermark-not-advancing-at-all-stuck-at-9223372036854775808/68743019#68743019 Can someone help me here? Regards, Kirti Dhar