This is resolved by the first approach I mentioned.

Thanks Team

On Mon, 30 Aug 2021 at 12:35, tarun joshi <1985.ta...@gmail.com> wrote:

> An update on this , I see that `IndexedRecord` is part of Avro Library.
> Please correct me If I am wrong in assuming that the "Pojo's generated by
> Avro POJO generator must be implementing IndexedRecord interface"  It seems
> either
>
>
>    -  I should be parsing Stringified Json from AWS Kinesis directly into
>    Avro.
>    -  *Or* convert those GSON parsed POJOs into Avro compatible POJOs at
>    Stream-time.
>
> Please let me know if anyone has a better way to do this.
>
>
> On Mon, 30 Aug 2021 at 10:13, tarun joshi <1985.ta...@gmail.com> wrote:
>
>> Hey all,
>> I am trying to write a simple pipeline to read
>>
>> Read Stringified JSON from Kinesis -> parsed to POJO -> converted to Avro
>> -> for the purpose of writing Parquet files to AWS S3.
>>
>> 1) This is my SimpleMapper
>>
>> public class SimpleMapper extends RichMapFunction<String, GenericRecord> {
>>     private static final GsonBuilder gsonBuilder =
>>             new 
>> GsonBuilder().excludeFieldsWithoutExposeAnnotation().setPrettyPrinting();
>>
>>     private static final Gson gson = gsonBuilder.create();
>>     private static final Schema schema = 
>> ReflectData.get().getSchema(Response.class);
>>
>>     @Override
>>     public GenericRecord map(String s) throws Exception {
>>
>>         Response response = gson.fromJson(s, Response.class);
>>         GenericData.Record record = new GenericData.Record(schema);
>>         record.put(0, response);
>>
>>         return record;
>>     }
>>
>> 2) This is my Job Definition
>>
>> public class ClickStreamPipeline implements Serializable {
>>
>>     private static Schema schema = 
>> ReflectData.get().getSchema(Response.class);
>>
>>     public static void main(String args[]) throws Exception {
>>         final MultipleParameterTool params = 
>> MultipleParameterTool.fromArgs(args);
>>         StreamExecutionEnvironment env = 
>> getStreamExecutionEnvironment(params);
>>
>>
>>         FlinkKinesisConsumer<String> kinesisConsumer =
>>                 new FlinkKinesisConsumer<>(
>>                         "web-clickstream", new SimpleStringSchema(), 
>> getKafkaConsumerProperties());
>>
>>         final StreamingFileSink<GenericRecord> streamingFileSink =
>>                 StreamingFileSink.forBulkFormat(
>>                         new 
>> Path("s3://data-ingestion-pipeline/flink_pipeline/"),
>>                         ParquetAvroWriters.forGenericRecord(schema))
>>                         .withRollingPolicy(OnCheckpointRollingPolicy.build())
>>                         .build();
>>
>>         env.addSource(kinesisConsumer)
>>                 .map(new SimpleMapper())
>>                 .returns(new GenericRecordAvroTypeInfo(schema))
>>                 .addSink(streamingFileSink);
>>
>>         env.execute("Read files in streaming fashion");
>>     }
>>
>>     private static StreamExecutionEnvironment getStreamExecutionEnvironment(
>>             MultipleParameterTool params) throws ClassNotFoundException {
>>         StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>         env.getConfig().setGlobalJobParameters(params);
>>
>>         Class<?> unmodColl = 
>> Class.forName("java.util.Collections$UnmodifiableCollection");
>>         env.getConfig()
>>                 .addDefaultKryoSerializer(unmodColl, 
>> UnmodifiableCollectionsSerializer.class);
>>         env.enableCheckpointing(60_000L);
>>
>>         return env;
>>     }
>>
>>
>>
>> The issue I am facing is failiing to serialize the Avro GenericRecord
>> wrapped message
>>
>>    - When I used a GenericRecordAvroTypeInfo(schema); to force use my
>>    Avro as preferred Serializer , I am getting the error below
>>
>>
>> *              java.lang.ClassCastException: class <my fully qualified
>> POJO> cannot be cast to class org.apache.avro.generic.IndexedRecord*
>>
>>
>>
>>    - If I don't use the GenericRecordAvroTypeInfo and try to register my
>>    pojo with KryoSerializer , the serialization fails with NPE somewhere in 
>> my
>>    Schema class.Do I need to implement/register a proper Avro serializer with
>>    flink config?
>>
>> Thanks for the help!
>>
>

Reply via email to