Flink Job Failed With Kafka Exception

2023-10-10 Thread Madan D via user
Hello Team, We are running the Flink pipeline by consuming data from multiple 
topics, but we recently encountered that if there's one topic having issues 
with participation, etc., the whole Flink pipeline is failing, which is 
affecting topics. Is there a way we can make Flink Piplein keep running even 
after one of the topics has an issue? We tried to handle exceptions to make 
sure the job wouldn't fail, but it didn't help out.
Caused by: java.lang.RuntimeException: Failed to get metadata for topics  Can 
you please provide any insights?

Regards,Madan

Re: Metric to capture decoding failure in flink sources

2023-10-10 Thread Mason Chen
Hi Prateek,

I agree, the reader should ideally expose the context to record metrics
about deserialization. One option is to defer deserialization to another
operator, say a RichMapFunction that has access to the RuntimeContext.

Best,
Mason

On Mon, Oct 9, 2023 at 12:42 PM Prateek Kohli 
wrote:

> Hi,
>
> I need to get the difference between records which are collected by the
> source and the records which are emitted.
> For eg - If deserialization fails while reading records from kafka, in
> that case I want to expose the difference between records collected from
> Kafka Broker and records emitted from Kafka operator after deserialization
> as a metric.
>
> But I think flink does not provide any such metrics.
>
> In Kafka Source I can have a workaround to get this metric:
>
> I can override the open method from KafkaRecordDeserializationSchema where
> a metric can be added to show decoding failures:
>
> @Override
> public void open(DeserializationSchema.InitializationContext context)
> throws Exception {
> context.getMetricGroup().gauge("decodingFailures", new Gauge()
> {
> @Override
> public Integer getValue()
> {
> return decodingFailures;
> }
> });
> }
>
> and at the time of deserialization I can increment that counter as below:
>
> @Override
> public void deserialize(ConsumerRecord record,
> Collector out)
> {
> try
> {
> //deserialize
> }
> catch (IOException | MMException e)
> {
> logger.error(String.format("Error received while decoding, in
> partition [%d] for topic [%s] at offset [%d]: %s",
> partition, topic, offset, e.toString()));
>
> decodingFailures++;
> }
>
> *But there is no such way to implement this in FileSource, as
> SimpleStreamFormat/Reader does not provide access to Context in any way.*
>
> *Is there any way I can get this metric in both File & Kafka Collector or
> any generic way to get this agnostic to what collector is being used?*
>
> Regards,
> Prateek Kohli
>


Running FlinkSQL locally

2023-10-10 Thread Ralph Matthias Debusmann
Hi,

I am trying to run FlinkSQL (through sql-client.sh) and read messages from
Kafka topics.

I downloaded 1.17.1 and extracted it. For the Kafka connectivity, I've
added flink-sql-connector-kafka-1.1.7.jar, flink-connector-kafka-1.1.7.jar
and kafka-clients-2.8.1.jar and now start the sql-client-.sh script adding
these three libs with the "-l" parameter.

Now I'd like to create a table based on a Kafka topic like this:
CREATE TABLE bla (
id BIGINT,
from_account INT,
to_account INT,
amount DOUBLE,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'bla',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'bla',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'true',
'json.ignore-parse-errors' = 'false'
);


Now when I select * from bla, I get the following error message:

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassCastException: cannot assign instance of
org.apache.kafka.clients.consumer.OffsetResetStrategy to field
org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer.offsetResetStrategy
of type
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetResetStrategy
in instance of
org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer

I tried different Kafka client lib versions and also multiple
flink(-sql)-connector-kafka versions with no avail :(

Best, Ralph M. Debusmann


Announcing public preview of Flink with Azure HDInsight on AKS

2023-10-10 Thread ramkrishna vasudevan
We are super excited to announce the public preview of Flink on Azure, as a
managed platform as a service.

You can read more information on this blog and reach out to me!

https://techcommunity.microsoft.com/t5/analytics-on-azure-blog/announcing-public-preview-of-apache-flink-with-azure-hdinsight/ba-p/3936611

Thank you

Regards
Ram


File Source Watermark Issue

2023-10-10 Thread Kirti Dhar Upadhyay K via user
Hi Team,

I am using Flink File Source with window aggregator as process function, and 
stuck with a weird issues.
File source doesn't seem emitting/progressing the watermarks, whereas if I put 
a delay (say 100ms) while extracting timestamp from event, it is working fine.

A bit same thing I found in comments here 
https://stackoverflow.com/questions/68736330/flink-watermark-not-advancing-at-all-stuck-at-9223372036854775808/68743019#68743019

Can someone help me here?

Regards,
Kirti Dhar