Hi Raghu,
When the KafkaIO sink code is merged , can you please add couple of
examples to demonstrate that :
1) KafkaIO works fine with FinkPipelineRunner
> the reason I am asking this; earlier when I tried to port KafkaIO
code (added a custom Sink code - copied from KafkaWriter) from
spark-runner into beam sdk; I ended up registering
'KafkaIO.Write.Bound.class' inside FlinkStreamingTransformTranslators
(I don't want to update the Flink-Runner core API)
It was a dirty hack and somehow worked (didn't test Avro that time) , so I quickly
adapted Max's suggestion on FlinkUnboundedSource & Sink approach - which is
very clean and worked for simple non-avro data
> Is Max's contrib 'FlinkUnboudedSink' going to be merged as well ?
2) KafkaIO can serialize an Avro-generated POJO (e.g. sample attached
here) into byte-array and then corresponding Sink can deserialize it
effortlessly .
> this is a very important use case for Network Industry where
thousands of Sensor-generated machine-data are converted into Avro and
sent to Kafka
> so I am trying to make it work seamlessly inside Beam-Flink Pipeline
( as explained in the other reply to the thread - based on Max's feedback )
** I am not yet able to Ser / DeSer the attached avro , even after
following Max's suggestions and even after
returning newAvroTypeInfo(avroType) , in method
AvroSerializationDeserializationSchema#getProducedType()
** I have posted the error messages in other response message
Thanks
Kaniska
On Wed, Apr 27, 2016 at 9:18 PM, Raghu Angadi <[email protected]
<mailto:[email protected]>> wrote:
Oh, does not work with 0.8.x as it it uses new consumer api in 0.9x.
On Wed, Apr 27, 2016 at 4:35 PM, kaniska Mandal
<[email protected] <mailto:[email protected]>> wrote:
Hi Raghu,
Thanks much for the update
We are using kafka 0.8.x
Can you please also add a working example (beam+flink+kafka)
with Avro-generated Pojo ?
Kaniska
On Wed, Apr 27, 2016 at 12:14 PM, Raghu Angadi
<[email protected] <mailto:[email protected]>> wrote:
Kaniska,
If your kafka cluster is running 0.9, you can also try
native KafkaIO
<https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L88>
merged recently into Beam. It supports reading from Kafka
and I am working on Sink support (this week).
See TopHashtagsExample.java
<https://github.com/apache/incubator-beam/pull/142/commits/f5c809d5d3551c4fcb64bc7bcde0c64f8dd76e0a#diff-796ac0dad9e90975cfea2e2b05a90d69R121>
for a complete example (it writes results back to Kafka in
ParDo() rather than a Sink). To use AvroCoder, your consumer
test will have something like
pipline.apply(KafkaIO.read()
.withBootstrapServers(options.getBroker())
.withTopics(ImmutableList.of(TOPICS))
.withValueCoder(AvroCoder.of(User.class))
...
Raghu.
On Wed, Apr 27, 2016 at 10:05 AM, Maximilian Michels
<[email protected] <mailto:[email protected]>> wrote:
Hi Kaniska,
Thanks for your mail. First of all, let us state clearly
the problem
you are trying to solve.
Do you want to use Avro serialization or Flink
serialization? If you
use the TypeInformationSerializationSchema you use Flink's
serialization stack - no Avro involved then. You have to
be consistent
and stick with either one. Otherwise problems are bound
to happen as
the Flink serialization doesn't understand Avro's
serialization and
also the other way around.
From your initial question it appears you want to
read/write Avro
serialized data from/to Kafka. So let's see how to do this:
============
A custom class
============
Let's assume you have a class like this:
public class MyCounts implements Serializable {
private String word;
private long count;
public MyCounts() {}
public MyCounts(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "MyCounts{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
=================================
The Serialization / Deserialization Schema
=================================
This is the schema for the Kafka Producer/Consumer to
serialize/deserialize data:
public class AvroSerializationDeserializationSchema<T>
implements SerializationSchema<T>,
DeserializationSchema<T> {
private final Class<T> avroType;
private final AvroCoder<T> coder;
private transient ByteArrayOutputStream out;
public
AvroSerializationDeserializationSchema(Class<T> clazz) {
this.avroType = clazz;
this.coder = AvroCoder.of(clazz);
this.out = new ByteArrayOutputStream();
}
@Override
public byte[] serialize(T element) {
if (out == null) {
out = new ByteArrayOutputStream();
}
try {
out.reset();
coder.encode(element, out, Coder.Context.NESTED);
} catch (IOException e) {
throw new RuntimeException("Avro encoding
failed.", e);
}
return out.toByteArray();
}
@Override
public T deserialize(byte[] message) throws
IOException {
return coder.decode(new
ByteArrayInputStream(message),
Coder.Context.NESTED);
}
@Override
public boolean isEndOfStream(T nextElement) {
return false;
}
@Override
public TypeInformation<T> getProducedType() {
return TypeExtractor.getForClass(avroType);
}
}
======================================
Writing some Avro serialized data to a Kafka topic
======================================
Pipeline pipeline = Pipeline.create(options);
PCollection<MyCounts> words =
pipeline.apply(Create.of(
new MyCounts("word", 1L),
new MyCounts("another", 2L),
new MyCounts("yet another", 3L)));
FlinkKafkaProducer08<MyCounts> kafkaSink =
new
FlinkKafkaProducer08<>(options.getKafkaOutputTopic(),
new
AvroSerializationDeserializationSchema<>(MyCounts.class),
props);
words.apply(Write.to(UnboundedFlinkSink.of(kafkaSink)));
pipeline.run();
Let's execute that.
====================================
Reading Avro serialized data from a Kafka topic
====================================
Now time to read back data from Kafka:
Pipeline pipeline = Pipeline.create(options);
FlinkKafkaConsumer08<MyCounts> kafkaConsumer = new
FlinkKafkaConsumer08<>(
options.getKafkaTopic(),
new
AvroSerializationDeserializationSchema<>(MyCounts.class),
props);
PCollection<MyCounts> words = pipeline
.apply(Read.from(UnboundedFlinkSource.of(kafkaConsumer)))
.apply(ParDo.of(new PrintFn()));
pipeline.run();
===================
Executing the examples
===================
It prints:
MyCounts{word='word', count=1}
MyCounts{word='another', count=2}
MyCounts{word='yet another', count=3}
Let me know if that helps you! I've omitted the Kafka
props and
options for brevity.
I hope that we will soon have native Kafka IO for both
reading and
writing to Kafka available in Beam.
Cheers,
Max
On Wed, Apr 27, 2016 at 4:21 AM, kaniska Mandal
<[email protected]
<mailto:[email protected]>> wrote:
> Sorry for cluttering the post with some code.
> I have attached couples of java file and one
Avro-generated pojo.
>
> I am facing some issues while reading / writing data
using flink's DeSer /
> Ser schema.
>
>
> A) << producer >> BeamKafkaFlinkAvroProducerTest
>
>>> if I use KafkaProducer directly (i.e. call
produceSimpleData() ) ,
>>> things are working fine (just for testing )
>
>>> Using FlinkKafkaProducer as UnboundedSource (this
is what I should do)
>
> produceAvroData2() { ...
>
> 1) First, if I use >> AvroSerializationSchema schema
= new
> AvroSerializationSchema(Test.class);
>
> i.e. essentially using Avro’s
org.apache.avro.specific.SpecificDatumWriter ;
> I face following error >>
>
> Caused by: java.lang.ClassCastException:
java.lang.String cannot be cast to
> org.apache.avro.generic.IndexedRecord
>
> at
org.apache.avro.generic.GenericData.getField(GenericData.java:580)
>
> at
org.apache.avro.generic.GenericData.getField(GenericData.java:595)
>
> at
>
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:112)
>
> at
>
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
>
>
> 2) Next, if I use TypeInformationSerializationSchema
(irrespective of
> AvroCoder in Pipeline) , things apparently work fine
>
> as Kafka test consumer tool prints the message >>
> java.lang.String{"uname": "Joe", "id": 6}
>
>
> B) <<Consumer>> , BeamKafkaFlinkAvroConsumerTest
>
>>> I understand we should either use
TypeInformationSerializationSchema in
>>> both consumer and producer OR
>
> should use AvroDeserializationSchema and
AvroSerializationSchema in Consumer
> and Producer respectively !!
>
> But, irrespective of using AvroDeserializationSchema or
> TypeInformationSerializationSchema, I get the
following exception >>
>
> Exception in thread "main"
java.lang.NullPointerException: null value in
> entry: V=null
>
> at
>
com.google.common.collect.CollectPreconditions.checkEntryNotNull(CollectPreconditions.java:33)
>
> at
>
com.google.common.collect.SingletonImmutableBiMap.<init>(SingletonImmutableBiMap.java:39)
>
> at
com.google.common.collect.ImmutableBiMap.of(ImmutableBiMap.java:49)
>
> at
com.google.common.collect.ImmutableMap.of(ImmutableMap.java:70)
>
> at
>
org.apache.beam.sdk.coders.CoderRegistry.getDefaultOutputCoder(CoderRegistry.java:221)
>
>
> Any clues whats wrong here ? May be missing some
simple step
> .. Trying to make a simple Avro schema work here and
eventually we need to
> deal with very complex Avro schema.
>
> Much appreciate your help.
>
> Thanks
> Kaniska
>
> On Tue, Apr 26, 2016 at 9:05 AM, Maximilian Michels
<[email protected] <mailto:[email protected]>> wrote:
>>
>> Hi Kaniska,
>>
>> To read data from Kafka, you need to supply a
DeserializationSchema.
>> Here is one you could use:
>> https://gist.github.com/StephanEwen/d515e10dd1c609f70bed
>>
>> Similarly, to write data into Kafka using the
Producer, you will need
>> a SerializationSchema. You need to serialize your
data into bytes
>> using your Avro schema. Actually, you could use the
AvroCoder which is
>> supplied in Beam for this. Or you implement your own
analogue to the
>> DeserializationSchema above.
>>
>> - Max
>>
>>
>> On Tue, Apr 26, 2016 at 8:43 AM, kaniska Mandal
>> <[email protected]
<mailto:[email protected]>> wrote:
>> > Hi,
>> >
>> > I followed the example in AvroITCase.java - which
reads/writes Avro data
>> > from/to filesystem.
>> >> I don't want to update AvroIO to embed Kafka
consumer / producer
>> >
>> > Also looked into -
https://issues.apache.org/jira/browse/FLINK-2597
>> >> But I couldn't instantiate
TypeInformationSerializationSchema in
>> >> following
>> >> FlinkKafkaConsumer / Producer - as I am not sure
how do get access to
>> >> ExecutionConfig
>> >
>> > Essentially, need help to change the following
code in Beam Pipeline
>> >> to read Avro data from Kafka and deserialize into
my custom object.
>> >
>> > public static UnboundedSource<String,
CheckpointMark> consumeMessages()
>> > {
>> > FlinkKafkaConsumer08<String> kafkaConsumer
= new
>> > FlinkKafkaConsumer08<>(options.getKafkaTopic(),
>> > ? , props);
>> >
>> > return UnboundedFlinkSource.of(kafkaConsumer);
>> > }
>> >
>> >> and how write Avro data into kafka ?
>> > public static void produceData(){
>> > FlinkKafkaProducer08<String> kafkaSink =
>> > new FlinkKafkaProducer08<>(TOPIC,
? , props);
>> >
>> > Pipeline pipeline = Pipeline.create(options);
>> > pipeline
>> > .apply(Create.of(
>> > new User("Joe", 3, "red"),
>> > new User("Mary", 4, "blue"),
>> > new User("Mark", 1, "green"),
>> > new User("Julia", 5, "purple"))
>> > .withCoder(AvroCoder.of(User.class)))
>> >
>> > .apply(transformationToWriteToKafkaSink());
>> >
>> > pipeline.run();
>> >
>> > }
>> >
>> > Thanks
>> > Kaniska
>
>