[Table API] [JDBC CDC] Caching Configuration from MySql instance needed in multiple Flink Jobs

2022-02-21 Thread Dan Serb
Hello all,

I kind of need the community’s help with some ideas, as I’m quite new with 
Flink and I feel like I need a little bit of guidance in regard to an 
implementation I’m working on.

What I need to do, is to have a way to store a mysql table in Flink, and expose 
that data to other jobs, as I need to query the data i to enrich some records 
received on a Kafka Source.

The initial solution, I’m working now is:


  1.  Have a processor that uses Flink JDBC CDC Connector over the table that 
stores the information I need. (This is implemented currently - working)
  2.  Find a way to store that Stream Source inside a table inside Flink. (I 
tried with the approach to create a MySql JDBC Catalog – but apparently, I can 
only create Postgres Catalog programmatically) – This is the question – What 
api do I need to use to facilitate saving inside Flink in a SQL Table, the data 
retrieved by the CDC Source?
  3.  The solution from point 2. Needs to be done in a way that I can query 
that table, for each record I receive in a different Job that has a Kafka 
Source as the entrypoint.

I was thinking about having the CDC Source inside the job that has the Kafka 
source, and I’m going to test if this is feasible as we speak, but the idea is 
that I need to get some information from the MySql database, each time I 
process one record from the Kafka source – will this be a good option if I’m 
able to persist the data into a temporary view inside the processor? I’m just 
worried that I might need to reuse this data sets from the sql database in 
future jobs, so this is why I’d like to have something decoupled and available 
for the entire cluster.

Like I said I’m new to Flink and it’s proven quite difficult for me to 
understand exactly what would be the best solution to use in my situation, this 
is the reason why I’m asking users that might have more experience with this 
and that might have had the same issues sometime in the past.

Thank you in advance, guys!

Regards,
Dan Serb



FlinkKafkaProducer - Avro - Schema Registry

2022-04-05 Thread Dan Serb
Hi guys,

I’m working on a solution where I ingest Kafka Records and I need to sink them 
to another topic using Avro and Schema Registry.
The problem I’m facing, is that I can’t find a suitable configuration that 
actually works for me.

I’m going to explain.


  1.  I have a KafkaSource that consumes basically the initial stream of data.
  2.  I have an Operator that maps the kafka records to Avro Objects (Java 
POJOs generated using mvn avro plugin, based on .avsc files)
  3.  I register the schemas in Schema Registry using the mvn 
schema-registry:register plugin/goal (registering the schema type as AVRO.
  4.  I have a FlinkKafkaProducer where I provide a 
serialization schema of type ConfluentRegistrySerializationSchema.

My Kafka Properties for the Producer:

kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
kafkaProps.put(
KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, 
"http://schemaregistry:38081";);
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
KafkaAvroSerializer.class);
kafkaProps.put("auto.register.schemas", false);
kafkaProps.put("use.latest.version", true);

As I learned from other tutorials/articles, I need to basically use 
KafkaAvroSerializer.class over ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG.
This will bring me eventually in the place from KafkaAvroSerializer, where 
based on how the record actually looks, it will get me the schema, it will go 
to the schema registry and bring the schema for the needed record, and 
serialize it before it gets sent.
The problem I’m having, is that, in the FlinkKafkaProducer class, in invoke() 
method, the keyedSchema is null in my case, but kafkaSchema is not null, and it 
basically does a ‘pre-serialization’ that is transforming my Record into a 
byte[]. This has an effect when it ends up in the KafkaAvroSerializer, as the 
Record is already a byte[] and it basically returns back a schema of type 
“bytes” instead of returning the schema I have for that SpecificRecord. And 
when it brings the propper schema from the schema registry, it basically fails 
for not being compatible. Schema {} is not compatible with schema of type 
“bytes”.

For more context, this is how my Processor looks at this moment.


DataStream kafkaRecords =
env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka");

SingleOutputStreamOperator producedRecords =
kafkaRecords
.map(
value -> {
  String kafkaKey = value.get(KEY).asText();
  String kafkaRecordJson = 
MAPPER.writeValueAsString(value.get(VALUE));
  return Converter.convert(kafkaKey, kafkaRecordJson);
})
.returns(TypeInformation.of(AvroObject.class));

AvroSerializationSchema schema =
ConfluentRegistryAvroSerializationSchema.forSpecific(AvroObject.class);

FlinkKafkaProducer< AvroObject > kafkaProducer =
new FlinkKafkaProducer<>("sink_topic", schema, kafkaProps);

producedRecords.addSink(kafkaProducer);

env.execute();

Exception:
Caused by: java.io.IOException: Incompatible schema { avro schema here} }with 
refs [] of type AVRO for schema "bytes".

PS: If I remove the KafkaAvroSerializer from the producer properties, it works 
fine, but when I consume the messages, the first message gets consumed but the 
values from the record are default ones. And the second message throws 
exception EOFExcetion – could not debug yet exactly the cause. It seems like, 
when I don’t have the KafkaAvroSerializer, is not actually going to use the 
schema registry to get the schema back and use that as a serializer, so I 
definitely need to have that there, but I still think I need to do some more 
config changes maybe in other places, because it’s definitely not working as 
expected.

Thanks a lot!
I would appreciate at least some points where I could investigate more and if 
there is someone else that has a similar implementation, maybe some tips and 
tricks.

Regards,
Dan Serb




Re: FlinkKafkaProducer - Avro - Schema Registry

2022-04-07 Thread Dan Serb
Hello Qingsheng,

Removing KafkaAvroSerializer from the producer properties worked, indeed.
I validated this by using a FlinkKafkaConsumer, using 
ConfluentRegistryAvroDeserializationSchema, so it's working properly.

The problem I'm still having, is that I will have to use schema registry where 
I will register multiple types of schemas, for messages that are going to come 
on the same kafka topic.
That means, that I will need the implementation that KafkaAvroSerializer.class 
is providing, and that is - going to schema registry to get the schema back by 
subject.

By only using ConfluentRegistryAvroDeserializationSceham.forSpecific() in my 
FlinkKafkaProducer, I don't see how I can have access to that functionality, as 
I debugged internally, and it seems like it's not going through the path I 
would like it to go.

So, in conclusion, I think I somehow need to have the producer properties 
together with the KafkaAvroSerializer still, for me to force the serialization 
to go through Schema Registry.

Regards,
Dan Serb

On 08.04.2022, 05:23, "Qingsheng Ren"  wrote:

Hi Dan,

In FlinkKafkaProducer, records are serialized by the SerializationSchema 
specified in the constructor, which is the “schema” 
(ConfluentRegistryAvroSerializationSchema.forSpecific(AvroObject.class)) in 
your case, instead of the serializer specified in producer properties. The 
default serializer used by FlinkKafkaProducer is ByteArraySerializer, so the 
flow of serialization would be:

[AvroObject] -> SerializationSchema -> [Bytes] -> ByteArraySerializer -> 
[Bytes]

So I think removing KafkaAvroSerializer from producer config and use 
AvroSerializationSchema is the right way. As you mentioned that messages could 
not be consumed back successfully, could you provide more information about how 
you consume message from Kafka (like using KafkaSource by Flink or just a 
KafkaConsumer, maybe also the configuration you are using)?

Best regards,

Qingsheng


> On Apr 5, 2022, at 16:54, Dan Serb  wrote:
> 
> Hi guys,
>  
> I’m working on a solution where I ingest Kafka Records and I need to sink 
them to another topic using Avro and Schema Registry.
> The problem I’m facing, is that I can’t find a suitable configuration 
that actually works for me.
>  
> I’m going to explain.
>  
>   • I have a KafkaSource that consumes basically the initial stream of 
data.
>   • I have an Operator that maps the kafka records to Avro Objects (Java 
POJOs generated using mvn avro plugin, based on .avsc files)
>   • I register the schemas in Schema Registry using the mvn 
schema-registry:register plugin/goal (registering the schema type as AVRO.
>   • I have a FlinkKafkaProducer where I provide a 
serialization schema of type ConfluentRegistrySerializationSchema.
>  
> My Kafka Properties for the Producer:
>  
> kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
> kafkaProps.put(
> KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, 
"http://schemaregistry:38081";);
> kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
KafkaAvroSerializer.class);
> kafkaProps.put("auto.register.schemas", false);
> kafkaProps.put("use.latest.version", true);
>  
> As I learned from other tutorials/articles, I need to basically use 
KafkaAvroSerializer.class over ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG.
> This will bring me eventually in the place from KafkaAvroSerializer, 
where based on how the record actually looks, it will get me the schema, it 
will go to the schema registry and bring the schema for the needed record, and 
serialize it before it gets sent.
> The problem I’m having, is that, in the FlinkKafkaProducer class, in 
invoke() method, the keyedSchema is null in my case, but kafkaSchema is not 
null, and it basically does a ‘pre-serialization’ that is transforming my 
Record into a byte[]. This has an effect when it ends up in the 
KafkaAvroSerializer, as the Record is already a byte[] and it basically returns 
back a schema of type “bytes” instead of returning the schema I have for that 
SpecificRecord. And when it brings the propper schema from the schema registry, 
it basically fails for not being compatible. Schema {} is not compatible with 
schema of type “bytes”.
>  
> For more context, this is how my Processor looks at this moment.
>  
> DataStream kafkaRecords =
> env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), 
"kafka");
> 
> SingleOutputStreamOperator producedRecords =
> kafkaRecords
> .map(
> value -> {
>   String kafkaKey = value.get(KEY).asText();
>   String kafkaRe