Re: DataStream from kafka topic

2021-04-04 Thread Maminspapin
Thank you all very much!

The problem is solved using
ConfluentRegistryAvroDeserializationSchema.forGeneric(schema,
"http://xxx.xx.xxx.xx:8081;) method.

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Proper-way-to-get-DataStream-lt-GenericRecord-gt-td42640.html

  

But I want to explore your notes. So many new things for me ))

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: DataStream from kafka topic

2021-04-01 Thread Arian Rohani
Thank you Arvid, I was going to suggest something like this also.
We use TestContainers and the docker images provided by ververica to do
exactly this in our team.

I am currently working on a small project on github to start sharing for
use cases like this.
The project will contain some example sources and example sinks together
with a generic Flink application.
I will follow up sometime during the weekend with a poc. It's super
straightforward to set-up and use.

To elaborate a bit more on Arvids suggestion:

   - Use TestContainers as a base to configure your integration test.
   - LocalStack  is a fully
   functional docker container that you can use to mock various AWS services.
   Since it's unclear what sink you're using i just want to throw this out
   there.
   - Set up two containers abstracting the job manager and task manager
   according to this
   

   documentation. If you decide to go with the application cluster route then
   I suggest setting up the task manager and job manager as GenericContainers.
   The rationale is that if you do everything in docker-compose and use a
   DockerComposeContainer the application will start before you have a chance
   to mock the data in your source as the DockerComposeContainer is started
   immediately iirc (which may be problematic depending on the way you
   application is configured to read from Kafka).

In fact one of the major benefits is that you simply configure the source
and sink and run the application outside of docker (as a
LocalStreamEnvironment).
This enables you to set breakpoints where the application is throwing the
exception which is specially valuable in circumstances like this where the
stacktrace is not super descriptive.

Best,
Arian Rohani


Den tors 1 apr. 2021 kl 15:00 skrev Arvid Heise :

> Arian gave good pointers, but I'd go even further: you should have ITCases
> where you pretty much just execute a mini job with docker-based Kafka and
> run it automatically.
> I strongly recommend to check out testcontainers [1], it makes writing
> such a test a really smooth experience.
>
> [1] https://www.testcontainers.org/modules/kafka/
>
>
> On Wed, Mar 31, 2021 at 2:29 PM Arian Rohani 
> wrote:
>
>> The issue at hand is that the record contains an unmodifiable collection
>> which the kryo serialiser attempts to modify by first initialising the
>> object and then adding items to the collection (iirc).
>>
>> Caused by: java.lang.UnsupportedOperationException
>>> at
>>> java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
>>
>>
>> Without knowing the specifics of what it is exactly you are trying to
>> deserialise I can only attempt to give a generic answer which is to try
>> something like:
>>
>>
>>> StreamExecutionEnvironment see =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> Class unmodColl =
>>> Class.forName("java.util.Collections$UnmodifiableCollection");
>>> see.getConfig().addDefaultKryoSerializer(unmodColl,
>>> UnmodifiableCollectionsSerializer.class);
>>
>>
>> An even better approach is to set-up a local sandbox environment in
>> docker with Kafka and a sink of your choice and simply running the
>> application form the main method in debug mode and setting a breakpoint
>> right before it throws the exception.
>>
>> Kind regards,
>> Arian Rohani
>>
>>
>> Den ons 31 mars 2021 kl 13:27 skrev Matthias Pohl > >:
>>
>>> Hi Maminspapin,
>>> I haven't worked with Kafka/Flink, yet. But have you had a look at the
>>> docs about the DeserializationSchema [1]? It
>>> mentions ConfluentRegistryAvroDeserializationSchema. Is this something
>>> you're looking for?
>>>
>>> Best,
>>> Matthias
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema
>>>
>>> On Tue, Mar 30, 2021 at 6:55 AM Maminspapin  wrote:
>>>
 I tried this:

 1. Schema (found in stackoverflow)

 class GenericRecordSchema implements
 KafkaDeserializationSchema {

 private String registryUrl;
 private transient KafkaAvroDeserializer deserializer;

 public GenericRecordSchema(String registryUrl) {
 this.registryUrl = registryUrl;
 }

 @Override
 public boolean isEndOfStream(GenericRecord nextElement) {
 return false;
 }

 @Override
 public GenericRecord deserialize(ConsumerRecord
 consumerRecord) throws Exception {
 checkInitialized();
 return (GenericRecord)
 deserializer.deserialize(consumerRecord.topic(),
 consumerRecord.value());
 }

 @Override
 public TypeInformation getProducedType() {
 return TypeExtractor.getForClass(GenericRecord.class);
 }

 private void 

Re: DataStream from kafka topic

2021-04-01 Thread Arvid Heise
Arian gave good pointers, but I'd go even further: you should have ITCases
where you pretty much just execute a mini job with docker-based Kafka and
run it automatically.
I strongly recommend to check out testcontainers [1], it makes writing such
a test a really smooth experience.

[1] https://www.testcontainers.org/modules/kafka/


On Wed, Mar 31, 2021 at 2:29 PM Arian Rohani  wrote:

> The issue at hand is that the record contains an unmodifiable collection
> which the kryo serialiser attempts to modify by first initialising the
> object and then adding items to the collection (iirc).
>
> Caused by: java.lang.UnsupportedOperationException
>> at
>> java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
>
>
> Without knowing the specifics of what it is exactly you are trying to
> deserialise I can only attempt to give a generic answer which is to try
> something like:
>
>
>> StreamExecutionEnvironment see =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> Class unmodColl =
>> Class.forName("java.util.Collections$UnmodifiableCollection");
>> see.getConfig().addDefaultKryoSerializer(unmodColl,
>> UnmodifiableCollectionsSerializer.class);
>
>
> An even better approach is to set-up a local sandbox environment in docker
> with Kafka and a sink of your choice and simply running the application
> form the main method in debug mode and setting a breakpoint right before it
> throws the exception.
>
> Kind regards,
> Arian Rohani
>
>
> Den ons 31 mars 2021 kl 13:27 skrev Matthias Pohl  >:
>
>> Hi Maminspapin,
>> I haven't worked with Kafka/Flink, yet. But have you had a look at the
>> docs about the DeserializationSchema [1]? It
>> mentions ConfluentRegistryAvroDeserializationSchema. Is this something
>> you're looking for?
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema
>>
>> On Tue, Mar 30, 2021 at 6:55 AM Maminspapin  wrote:
>>
>>> I tried this:
>>>
>>> 1. Schema (found in stackoverflow)
>>>
>>> class GenericRecordSchema implements
>>> KafkaDeserializationSchema {
>>>
>>> private String registryUrl;
>>> private transient KafkaAvroDeserializer deserializer;
>>>
>>> public GenericRecordSchema(String registryUrl) {
>>> this.registryUrl = registryUrl;
>>> }
>>>
>>> @Override
>>> public boolean isEndOfStream(GenericRecord nextElement) {
>>> return false;
>>> }
>>>
>>> @Override
>>> public GenericRecord deserialize(ConsumerRecord
>>> consumerRecord) throws Exception {
>>> checkInitialized();
>>> return (GenericRecord)
>>> deserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
>>> }
>>>
>>> @Override
>>> public TypeInformation getProducedType() {
>>> return TypeExtractor.getForClass(GenericRecord.class);
>>> }
>>>
>>> private void checkInitialized() {
>>> if (deserializer == null) {
>>> Map props = new HashMap<>();
>>>
>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>>> registryUrl);
>>>
>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
>>> false);
>>> SchemaRegistryClient client =
>>> new CachedSchemaRegistryClient(
>>> registryUrl,
>>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>>> deserializer = new KafkaAvroDeserializer(client, props);
>>> }
>>> }
>>> }
>>>
>>> 2. Consumer
>>>
>>> private static FlinkKafkaConsumer getConsumer(String
>>> topic) {
>>>
>>> return new FlinkKafkaConsumer<>(
>>> topic,
>>> new GenericRecordSchema("http://xxx.xx.xxx.xx:8081;),
>>> getConsumerProperties());
>>> }
>>>
>>> But when I start the app, the following error is happen:
>>>
>>> com.esotericsoftware.kryo.KryoException:
>>> java.lang.UnsupportedOperationException
>>> Serialization trace:
>>> reserved (org.apache.avro.Schema$Field)
>>> fieldMap (org.apache.avro.Schema$RecordSchema)
>>> schema (org.apache.avro.generic.GenericData$Record)
>>> at
>>>
>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>> at
>>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>> at
>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>> at
>>>
>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>>> at
>>>
>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>> at
>>>
>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>> at
>>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>> at
>>>

Re: DataStream from kafka topic

2021-03-31 Thread Arian Rohani
The issue at hand is that the record contains an unmodifiable collection
which the kryo serialiser attempts to modify by first initialising the
object and then adding items to the collection (iirc).

Caused by: java.lang.UnsupportedOperationException
> at
> java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)


Without knowing the specifics of what it is exactly you are trying to
deserialise I can only attempt to give a generic answer which is to try
something like:


> StreamExecutionEnvironment see =
> StreamExecutionEnvironment.getExecutionEnvironment();
> Class unmodColl =
> Class.forName("java.util.Collections$UnmodifiableCollection");
> see.getConfig().addDefaultKryoSerializer(unmodColl,
> UnmodifiableCollectionsSerializer.class);


An even better approach is to set-up a local sandbox environment in docker
with Kafka and a sink of your choice and simply running the application
form the main method in debug mode and setting a breakpoint right before it
throws the exception.

Kind regards,
Arian Rohani


Den ons 31 mars 2021 kl 13:27 skrev Matthias Pohl :

> Hi Maminspapin,
> I haven't worked with Kafka/Flink, yet. But have you had a look at the
> docs about the DeserializationSchema [1]? It
> mentions ConfluentRegistryAvroDeserializationSchema. Is this something
> you're looking for?
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema
>
> On Tue, Mar 30, 2021 at 6:55 AM Maminspapin  wrote:
>
>> I tried this:
>>
>> 1. Schema (found in stackoverflow)
>>
>> class GenericRecordSchema implements
>> KafkaDeserializationSchema {
>>
>> private String registryUrl;
>> private transient KafkaAvroDeserializer deserializer;
>>
>> public GenericRecordSchema(String registryUrl) {
>> this.registryUrl = registryUrl;
>> }
>>
>> @Override
>> public boolean isEndOfStream(GenericRecord nextElement) {
>> return false;
>> }
>>
>> @Override
>> public GenericRecord deserialize(ConsumerRecord
>> consumerRecord) throws Exception {
>> checkInitialized();
>> return (GenericRecord)
>> deserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
>> }
>>
>> @Override
>> public TypeInformation getProducedType() {
>> return TypeExtractor.getForClass(GenericRecord.class);
>> }
>>
>> private void checkInitialized() {
>> if (deserializer == null) {
>> Map props = new HashMap<>();
>>
>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>> registryUrl);
>>
>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
>> SchemaRegistryClient client =
>> new CachedSchemaRegistryClient(
>> registryUrl,
>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>> deserializer = new KafkaAvroDeserializer(client, props);
>> }
>> }
>> }
>>
>> 2. Consumer
>>
>> private static FlinkKafkaConsumer getConsumer(String
>> topic) {
>>
>> return new FlinkKafkaConsumer<>(
>> topic,
>> new GenericRecordSchema("http://xxx.xx.xxx.xx:8081;),
>> getConsumerProperties());
>> }
>>
>> But when I start the app, the following error is happen:
>>
>> com.esotericsoftware.kryo.KryoException:
>> java.lang.UnsupportedOperationException
>> Serialization trace:
>> reserved (org.apache.avro.Schema$Field)
>> fieldMap (org.apache.avro.Schema$RecordSchema)
>> schema (org.apache.avro.generic.GenericData$Record)
>> at
>>
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>> at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>> at
>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>> at
>>
>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>> at
>>
>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>> at
>>
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>> at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>> at
>>
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>> at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>> at
>>
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
>> at
>>
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
>> at
>>
>> 

Re: DataStream from kafka topic

2021-03-31 Thread Matthias Pohl
Ok, it looks like you've found that solution already based on your question
in [1].

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Proper-way-to-get-DataStream-lt-GenericRecord-gt-td42640.html

On Wed, Mar 31, 2021 at 1:26 PM Matthias Pohl 
wrote:

> Hi Maminspapin,
> I haven't worked with Kafka/Flink, yet. But have you had a look at the
> docs about the DeserializationSchema [1]? It
> mentions ConfluentRegistryAvroDeserializationSchema. Is this something
> you're looking for?
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema
>
> On Tue, Mar 30, 2021 at 6:55 AM Maminspapin  wrote:
>
>> I tried this:
>>
>> 1. Schema (found in stackoverflow)
>>
>> class GenericRecordSchema implements
>> KafkaDeserializationSchema {
>>
>> private String registryUrl;
>> private transient KafkaAvroDeserializer deserializer;
>>
>> public GenericRecordSchema(String registryUrl) {
>> this.registryUrl = registryUrl;
>> }
>>
>> @Override
>> public boolean isEndOfStream(GenericRecord nextElement) {
>> return false;
>> }
>>
>> @Override
>> public GenericRecord deserialize(ConsumerRecord
>> consumerRecord) throws Exception {
>> checkInitialized();
>> return (GenericRecord)
>> deserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
>> }
>>
>> @Override
>> public TypeInformation getProducedType() {
>> return TypeExtractor.getForClass(GenericRecord.class);
>> }
>>
>> private void checkInitialized() {
>> if (deserializer == null) {
>> Map props = new HashMap<>();
>>
>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>> registryUrl);
>>
>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
>> SchemaRegistryClient client =
>> new CachedSchemaRegistryClient(
>> registryUrl,
>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>> deserializer = new KafkaAvroDeserializer(client, props);
>> }
>> }
>> }
>>
>> 2. Consumer
>>
>> private static FlinkKafkaConsumer getConsumer(String
>> topic) {
>>
>> return new FlinkKafkaConsumer<>(
>> topic,
>> new GenericRecordSchema("http://xxx.xx.xxx.xx:8081;),
>> getConsumerProperties());
>> }
>>
>> But when I start the app, the following error is happen:
>>
>> com.esotericsoftware.kryo.KryoException:
>> java.lang.UnsupportedOperationException
>> Serialization trace:
>> reserved (org.apache.avro.Schema$Field)
>> fieldMap (org.apache.avro.Schema$RecordSchema)
>> schema (org.apache.avro.generic.GenericData$Record)
>> at
>>
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>> at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>> at
>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>> at
>>
>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>> at
>>
>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>> at
>>
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>> at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>> at
>>
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>> at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>> at
>>
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
>> at
>>
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
>> at
>>
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>> at
>>
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>> at
>>
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>> at
>>
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>> at
>>
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
>> at
>>
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
>> at
>>
>> 

Re: DataStream from kafka topic

2021-03-31 Thread Matthias Pohl
Hi Maminspapin,
I haven't worked with Kafka/Flink, yet. But have you had a look at the docs
about the DeserializationSchema [1]? It
mentions ConfluentRegistryAvroDeserializationSchema. Is this something
you're looking for?

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema

On Tue, Mar 30, 2021 at 6:55 AM Maminspapin  wrote:

> I tried this:
>
> 1. Schema (found in stackoverflow)
>
> class GenericRecordSchema implements
> KafkaDeserializationSchema {
>
> private String registryUrl;
> private transient KafkaAvroDeserializer deserializer;
>
> public GenericRecordSchema(String registryUrl) {
> this.registryUrl = registryUrl;
> }
>
> @Override
> public boolean isEndOfStream(GenericRecord nextElement) {
> return false;
> }
>
> @Override
> public GenericRecord deserialize(ConsumerRecord
> consumerRecord) throws Exception {
> checkInitialized();
> return (GenericRecord)
> deserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
> }
>
> @Override
> public TypeInformation getProducedType() {
> return TypeExtractor.getForClass(GenericRecord.class);
> }
>
> private void checkInitialized() {
> if (deserializer == null) {
> Map props = new HashMap<>();
>
> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
> registryUrl);
>
> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
> SchemaRegistryClient client =
> new CachedSchemaRegistryClient(
> registryUrl,
> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
> deserializer = new KafkaAvroDeserializer(client, props);
> }
> }
> }
>
> 2. Consumer
>
> private static FlinkKafkaConsumer getConsumer(String topic)
> {
>
> return new FlinkKafkaConsumer<>(
> topic,
> new GenericRecordSchema("http://xxx.xx.xxx.xx:8081;),
> getConsumerProperties());
> }
>
> But when I start the app, the following error is happen:
>
> com.esotericsoftware.kryo.KryoException:
> java.lang.UnsupportedOperationException
> Serialization trace:
> reserved (org.apache.avro.Schema$Field)
> fieldMap (org.apache.avro.Schema$RecordSchema)
> schema (org.apache.avro.generic.GenericData$Record)
> at
>
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
> at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at
>
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
> at
>
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> at
>
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> at
>
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
> at
>
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
> at
>
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
> at
>
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
> at
>
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
> at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> at
>
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
> at
>
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
> at
>
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
> at
>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
> at
>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
> at
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>  

Re: DataStream from kafka topic

2021-03-30 Thread Maminspapin
I tried this:

1. Schema (found in stackoverflow)

class GenericRecordSchema implements
KafkaDeserializationSchema {

private String registryUrl;
private transient KafkaAvroDeserializer deserializer;

public GenericRecordSchema(String registryUrl) {
this.registryUrl = registryUrl;
}

@Override
public boolean isEndOfStream(GenericRecord nextElement) {
return false;
}

@Override
public GenericRecord deserialize(ConsumerRecord
consumerRecord) throws Exception {
checkInitialized();
return (GenericRecord)
deserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
}

@Override
public TypeInformation getProducedType() {
return TypeExtractor.getForClass(GenericRecord.class);
}

private void checkInitialized() {
if (deserializer == null) {
Map props = new HashMap<>();
   
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
registryUrl);
   
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
SchemaRegistryClient client =
new CachedSchemaRegistryClient(
registryUrl,
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
deserializer = new KafkaAvroDeserializer(client, props);
}
}
}

2. Consumer

private static FlinkKafkaConsumer getConsumer(String topic) {

return new FlinkKafkaConsumer<>(
topic,
new GenericRecordSchema("http://xxx.xx.xxx.xx:8081;),
getConsumerProperties());
}

But when I start the app, the following error is happen:

com.esotericsoftware.kryo.KryoException:
java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
Caused by: java.lang.UnsupportedOperationException
at 
java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at

DataStream from kafka topic

2021-03-29 Thread Maminspapin
Hi everyone.

How can I get entry in GenericRecord format from kafka topic using
SchemaRegistry? 
I read this:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
But can't to build it in my code... 

Is there some tutorials or examples to deserialise data using
schema.rgistry.url?

Thanks



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/