An update on this , I see that `IndexedRecord` is part of Avro Library.
Please correct me If I am wrong in assuming that the "Pojo's generated by
Avro POJO generator must be implementing IndexedRecord interface"  It seems
either


   -  I should be parsing Stringified Json from AWS Kinesis directly into
   Avro.
   -  *Or* convert those GSON parsed POJOs into Avro compatible POJOs at
   Stream-time.

Please let me know if anyone has a better way to do this.


On Mon, 30 Aug 2021 at 10:13, tarun joshi <1985.ta...@gmail.com> wrote:

> Hey all,
> I am trying to write a simple pipeline to read
>
> Read Stringified JSON from Kinesis -> parsed to POJO -> converted to Avro
> -> for the purpose of writing Parquet files to AWS S3.
>
> 1) This is my SimpleMapper
>
> public class SimpleMapper extends RichMapFunction<String, GenericRecord> {
>     private static final GsonBuilder gsonBuilder =
>             new 
> GsonBuilder().excludeFieldsWithoutExposeAnnotation().setPrettyPrinting();
>
>     private static final Gson gson = gsonBuilder.create();
>     private static final Schema schema = 
> ReflectData.get().getSchema(Response.class);
>
>     @Override
>     public GenericRecord map(String s) throws Exception {
>
>         Response response = gson.fromJson(s, Response.class);
>         GenericData.Record record = new GenericData.Record(schema);
>         record.put(0, response);
>
>         return record;
>     }
>
> 2) This is my Job Definition
>
> public class ClickStreamPipeline implements Serializable {
>
>     private static Schema schema = 
> ReflectData.get().getSchema(Response.class);
>
>     public static void main(String args[]) throws Exception {
>         final MultipleParameterTool params = 
> MultipleParameterTool.fromArgs(args);
>         StreamExecutionEnvironment env = 
> getStreamExecutionEnvironment(params);
>
>
>         FlinkKinesisConsumer<String> kinesisConsumer =
>                 new FlinkKinesisConsumer<>(
>                         "web-clickstream", new SimpleStringSchema(), 
> getKafkaConsumerProperties());
>
>         final StreamingFileSink<GenericRecord> streamingFileSink =
>                 StreamingFileSink.forBulkFormat(
>                         new 
> Path("s3://data-ingestion-pipeline/flink_pipeline/"),
>                         ParquetAvroWriters.forGenericRecord(schema))
>                         .withRollingPolicy(OnCheckpointRollingPolicy.build())
>                         .build();
>
>         env.addSource(kinesisConsumer)
>                 .map(new SimpleMapper())
>                 .returns(new GenericRecordAvroTypeInfo(schema))
>                 .addSink(streamingFileSink);
>
>         env.execute("Read files in streaming fashion");
>     }
>
>     private static StreamExecutionEnvironment getStreamExecutionEnvironment(
>             MultipleParameterTool params) throws ClassNotFoundException {
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.getConfig().setGlobalJobParameters(params);
>
>         Class<?> unmodColl = 
> Class.forName("java.util.Collections$UnmodifiableCollection");
>         env.getConfig()
>                 .addDefaultKryoSerializer(unmodColl, 
> UnmodifiableCollectionsSerializer.class);
>         env.enableCheckpointing(60_000L);
>
>         return env;
>     }
>
>
>
> The issue I am facing is failiing to serialize the Avro GenericRecord
> wrapped message
>
>    - When I used a GenericRecordAvroTypeInfo(schema); to force use my
>    Avro as preferred Serializer , I am getting the error below
>
>
> *              java.lang.ClassCastException: class <my fully qualified
> POJO> cannot be cast to class org.apache.avro.generic.IndexedRecord*
>
>
>
>    - If I don't use the GenericRecordAvroTypeInfo and try to register my
>    pojo with KryoSerializer , the serialization fails with NPE somewhere in my
>    Schema class.Do I need to implement/register a proper Avro serializer with
>    flink config?
>
> Thanks for the help!
>

Reply via email to