Jordan Moore created FLINK-25711:
------------------------------------
Summary: OOM / buffer-overflow on KafkaIO SpecificRecord
Key: FLINK-25711
URL: https://issues.apache.org/jira/browse/FLINK-25711
Project: Flink
Issue Type: Bug
Components: Connectors / Kafka
Reporter: Jordan Moore
*Details* - Trying to use a generated Avro SpecificRecord subclass with
KafkaIO.read (I was able to use KafkaIO.write fine with it).
*Problem* - OOM happens while constructing the deserializer with
SpecificRecord, but not GenericRecord. I am unable to use my generated class
because I get errors saying it cannot be cast to a GenericRecord (even though
it extends/implements it though a chain of other classes)
Small example with Kafka and Confluent Schema Registry locally
{code}
public static void main(String[] args) throws Exception {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
// Pipeline p = getWritePipeline(options);
Pipeline p = Pipeline.create(options);
final String topic = "foobar-2";
final SubjectNameStrategy subjectStrategy = new TopicNameStrategy();
final String valueSubject = subjectStrategy.subjectName(topic, false,
null); // schema not used
final ConfluentSchemaRegistryDeserializerProvider<SpecificRecord>
valueProvider =
ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081",
valueSubject, null,
// TODO: This doesn't
seem to work to get the SpecificRecord subclass in the apply function below
ImmutableMap.of(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true));
p
.apply(KafkaIO.<byte[], SpecificRecord>read()
.withBootstrapServers("localhost:9092")
.withTopic(topic)
.withKeyDeserializer(ByteArrayDeserializer.class) // Don't
have any keys, but this is required
.withValueDeserializer(valueProvider)
.withConsumerConfigUpdates(ImmutableMap.of(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
OffsetResetStrategy.EARLIEST.name().toLowerCase(Locale.ROOT),
ConsumerConfig.GROUP_ID_CONFIG, "beam-" +
UUID.randomUUID()
))
.withoutMetadata()
).apply(Values.create())
// TODO: How to get SpecificRecord subclass?
.apply(MapElements.via(new SimpleFunction<SpecificRecord, Void>() {
@Override
public Void apply(SpecificRecord input) {
log.info("{}", input);
return null;
}
}));
p.run().waitUntilFinish();
}
{code}
Avro schema that I am using, which generates a class Product.java that I would
like to use in-place of SpecificRecord above.
{code}
{"type":"record","name":"Product","namespace":"cricket.jmoore.avro","fields":[{"name":"name","type":"string"}]}
{code}
*Flink Version*: 2.35.0
Dependencies:
{code}
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version> <!-- 2.35.0 -->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
{code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)