Re: Flink issues with Avro GenericRecord serialization
This is resolved by the first approach I mentioned. Thanks Team On Mon, 30 Aug 2021 at 12:35, tarun joshi <1985.ta...@gmail.com> wrote: > An update on this , I see that `IndexedRecord` is part of Avro Library. > Please correct me If I am wrong in assuming that the "Pojo's generated by > Avro POJO generator must be implementing IndexedRecord interface" It seems > either > > >- I should be parsing Stringified Json from AWS Kinesis directly into >Avro. >- *Or* convert those GSON parsed POJOs into Avro compatible POJOs at >Stream-time. > > Please let me know if anyone has a better way to do this. > > > On Mon, 30 Aug 2021 at 10:13, tarun joshi <1985.ta...@gmail.com> wrote: > >> Hey all, >> I am trying to write a simple pipeline to read >> >> Read Stringified JSON from Kinesis -> parsed to POJO -> converted to Avro >> -> for the purpose of writing Parquet files to AWS S3. >> >> 1) This is my SimpleMapper >> >> public class SimpleMapper extends RichMapFunction { >> private static final GsonBuilder gsonBuilder = >> new >> GsonBuilder().excludeFieldsWithoutExposeAnnotation().setPrettyPrinting(); >> >> private static final Gson gson = gsonBuilder.create(); >> private static final Schema schema = >> ReflectData.get().getSchema(Response.class); >> >> @Override >> public GenericRecord map(String s) throws Exception { >> >> Response response = gson.fromJson(s, Response.class); >> GenericData.Record record = new GenericData.Record(schema); >> record.put(0, response); >> >> return record; >> } >> >> 2) This is my Job Definition >> >> public class ClickStreamPipeline implements Serializable { >> >> private static Schema schema = >> ReflectData.get().getSchema(Response.class); >> >> public static void main(String args[]) throws Exception { >> final MultipleParameterTool params = >> MultipleParameterTool.fromArgs(args); >> StreamExecutionEnvironment env = >> getStreamExecutionEnvironment(params); >> >> >> FlinkKinesisConsumer kinesisConsumer = >> new FlinkKinesisConsumer<>( >> "web-clickstream", new SimpleStringSchema(), >> getKafkaConsumerProperties()); >> >> final StreamingFileSink streamingFileSink = >> StreamingFileSink.forBulkFormat( >> new >> Path("s3://data-ingestion-pipeline/flink_pipeline/"), >> ParquetAvroWriters.forGenericRecord(schema)) >> .withRollingPolicy(OnCheckpointRollingPolicy.build()) >> .build(); >> >> env.addSource(kinesisConsumer) >> .map(new SimpleMapper()) >> .returns(new GenericRecordAvroTypeInfo(schema)) >> .addSink(streamingFileSink); >> >> env.execute("Read files in streaming fashion"); >> } >> >> private static StreamExecutionEnvironment getStreamExecutionEnvironment( >> MultipleParameterTool params) throws ClassNotFoundException { >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> env.getConfig().setGlobalJobParameters(params); >> >> Class unmodColl = >> Class.forName("java.util.Collections$UnmodifiableCollection"); >> env.getConfig() >> .addDefaultKryoSerializer(unmodColl, >> UnmodifiableCollectionsSerializer.class); >> env.enableCheckpointing(60_000L); >> >> return env; >> } >> >> >> >> The issue I am facing is failiing to serialize the Avro GenericRecord >> wrapped message >> >>- When I used a GenericRecordAvroTypeInfo(schema); to force use my >>Avro as preferred Serializer , I am getting the error below >> >> >> * java.lang.ClassCastException: class > POJO> cannot be cast to class org.apache.avro.generic.IndexedRecord* >> >> >> >>- If I don't use the GenericRecordAvroTypeInfo and try to register my >>pojo with KryoSerializer , the serialization fails with NPE somewhere in >> my >>Schema class.Do I need to implement/register a proper Avro serializer with >>flink config? >> >> Thanks for the help! >> >
Re: Flink issues with Avro GenericRecord serialization
An update on this , I see that `IndexedRecord` is part of Avro Library. Please correct me If I am wrong in assuming that the "Pojo's generated by Avro POJO generator must be implementing IndexedRecord interface" It seems either - I should be parsing Stringified Json from AWS Kinesis directly into Avro. - *Or* convert those GSON parsed POJOs into Avro compatible POJOs at Stream-time. Please let me know if anyone has a better way to do this. On Mon, 30 Aug 2021 at 10:13, tarun joshi <1985.ta...@gmail.com> wrote: > Hey all, > I am trying to write a simple pipeline to read > > Read Stringified JSON from Kinesis -> parsed to POJO -> converted to Avro > -> for the purpose of writing Parquet files to AWS S3. > > 1) This is my SimpleMapper > > public class SimpleMapper extends RichMapFunction { > private static final GsonBuilder gsonBuilder = > new > GsonBuilder().excludeFieldsWithoutExposeAnnotation().setPrettyPrinting(); > > private static final Gson gson = gsonBuilder.create(); > private static final Schema schema = > ReflectData.get().getSchema(Response.class); > > @Override > public GenericRecord map(String s) throws Exception { > > Response response = gson.fromJson(s, Response.class); > GenericData.Record record = new GenericData.Record(schema); > record.put(0, response); > > return record; > } > > 2) This is my Job Definition > > public class ClickStreamPipeline implements Serializable { > > private static Schema schema = > ReflectData.get().getSchema(Response.class); > > public static void main(String args[]) throws Exception { > final MultipleParameterTool params = > MultipleParameterTool.fromArgs(args); > StreamExecutionEnvironment env = > getStreamExecutionEnvironment(params); > > > FlinkKinesisConsumer kinesisConsumer = > new FlinkKinesisConsumer<>( > "web-clickstream", new SimpleStringSchema(), > getKafkaConsumerProperties()); > > final StreamingFileSink streamingFileSink = > StreamingFileSink.forBulkFormat( > new > Path("s3://data-ingestion-pipeline/flink_pipeline/"), > ParquetAvroWriters.forGenericRecord(schema)) > .withRollingPolicy(OnCheckpointRollingPolicy.build()) > .build(); > > env.addSource(kinesisConsumer) > .map(new SimpleMapper()) > .returns(new GenericRecordAvroTypeInfo(schema)) > .addSink(streamingFileSink); > > env.execute("Read files in streaming fashion"); > } > > private static StreamExecutionEnvironment getStreamExecutionEnvironment( > MultipleParameterTool params) throws ClassNotFoundException { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.getConfig().setGlobalJobParameters(params); > > Class unmodColl = > Class.forName("java.util.Collections$UnmodifiableCollection"); > env.getConfig() > .addDefaultKryoSerializer(unmodColl, > UnmodifiableCollectionsSerializer.class); > env.enableCheckpointing(60_000L); > > return env; > } > > > > The issue I am facing is failiing to serialize the Avro GenericRecord > wrapped message > >- When I used a GenericRecordAvroTypeInfo(schema); to force use my >Avro as preferred Serializer , I am getting the error below > > > * java.lang.ClassCastException: class POJO> cannot be cast to class org.apache.avro.generic.IndexedRecord* > > > >- If I don't use the GenericRecordAvroTypeInfo and try to register my >pojo with KryoSerializer , the serialization fails with NPE somewhere in my >Schema class.Do I need to implement/register a proper Avro serializer with >flink config? > > Thanks for the help! >
Flink issues with Avro GenericRecord serialization
Hey all, I am trying to write a simple pipeline to read Read Stringified JSON from Kinesis -> parsed to POJO -> converted to Avro -> for the purpose of writing Parquet files to AWS S3. 1) This is my SimpleMapper public class SimpleMapper extends RichMapFunction { private static final GsonBuilder gsonBuilder = new GsonBuilder().excludeFieldsWithoutExposeAnnotation().setPrettyPrinting(); private static final Gson gson = gsonBuilder.create(); private static final Schema schema = ReflectData.get().getSchema(Response.class); @Override public GenericRecord map(String s) throws Exception { Response response = gson.fromJson(s, Response.class); GenericData.Record record = new GenericData.Record(schema); record.put(0, response); return record; } 2) This is my Job Definition public class ClickStreamPipeline implements Serializable { private static Schema schema = ReflectData.get().getSchema(Response.class); public static void main(String args[]) throws Exception { final MultipleParameterTool params = MultipleParameterTool.fromArgs(args); StreamExecutionEnvironment env = getStreamExecutionEnvironment(params); FlinkKinesisConsumer kinesisConsumer = new FlinkKinesisConsumer<>( "web-clickstream", new SimpleStringSchema(), getKafkaConsumerProperties()); final StreamingFileSink streamingFileSink = StreamingFileSink.forBulkFormat( new Path("s3://data-ingestion-pipeline/flink_pipeline/"), ParquetAvroWriters.forGenericRecord(schema)) .withRollingPolicy(OnCheckpointRollingPolicy.build()) .build(); env.addSource(kinesisConsumer) .map(new SimpleMapper()) .returns(new GenericRecordAvroTypeInfo(schema)) .addSink(streamingFileSink); env.execute("Read files in streaming fashion"); } private static StreamExecutionEnvironment getStreamExecutionEnvironment( MultipleParameterTool params) throws ClassNotFoundException { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); Class unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection"); env.getConfig() .addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class); env.enableCheckpointing(60_000L); return env; } The issue I am facing is failiing to serialize the Avro GenericRecord wrapped message - When I used a GenericRecordAvroTypeInfo(schema); to force use my Avro as preferred Serializer , I am getting the error below * java.lang.ClassCastException: class cannot be cast to class org.apache.avro.generic.IndexedRecord* - If I don't use the GenericRecordAvroTypeInfo and try to register my pojo with KryoSerializer , the serialization fails with NPE somewhere in my Schema class.Do I need to implement/register a proper Avro serializer with flink config? Thanks for the help!