An update on this , I see that `IndexedRecord` is part of Avro Library. Please correct me If I am wrong in assuming that the "Pojo's generated by Avro POJO generator must be implementing IndexedRecord interface" It seems either
- I should be parsing Stringified Json from AWS Kinesis directly into Avro. - *Or* convert those GSON parsed POJOs into Avro compatible POJOs at Stream-time. Please let me know if anyone has a better way to do this. On Mon, 30 Aug 2021 at 10:13, tarun joshi <1985.ta...@gmail.com> wrote: > Hey all, > I am trying to write a simple pipeline to read > > Read Stringified JSON from Kinesis -> parsed to POJO -> converted to Avro > -> for the purpose of writing Parquet files to AWS S3. > > 1) This is my SimpleMapper > > public class SimpleMapper extends RichMapFunction<String, GenericRecord> { > private static final GsonBuilder gsonBuilder = > new > GsonBuilder().excludeFieldsWithoutExposeAnnotation().setPrettyPrinting(); > > private static final Gson gson = gsonBuilder.create(); > private static final Schema schema = > ReflectData.get().getSchema(Response.class); > > @Override > public GenericRecord map(String s) throws Exception { > > Response response = gson.fromJson(s, Response.class); > GenericData.Record record = new GenericData.Record(schema); > record.put(0, response); > > return record; > } > > 2) This is my Job Definition > > public class ClickStreamPipeline implements Serializable { > > private static Schema schema = > ReflectData.get().getSchema(Response.class); > > public static void main(String args[]) throws Exception { > final MultipleParameterTool params = > MultipleParameterTool.fromArgs(args); > StreamExecutionEnvironment env = > getStreamExecutionEnvironment(params); > > > FlinkKinesisConsumer<String> kinesisConsumer = > new FlinkKinesisConsumer<>( > "web-clickstream", new SimpleStringSchema(), > getKafkaConsumerProperties()); > > final StreamingFileSink<GenericRecord> streamingFileSink = > StreamingFileSink.forBulkFormat( > new > Path("s3://data-ingestion-pipeline/flink_pipeline/"), > ParquetAvroWriters.forGenericRecord(schema)) > .withRollingPolicy(OnCheckpointRollingPolicy.build()) > .build(); > > env.addSource(kinesisConsumer) > .map(new SimpleMapper()) > .returns(new GenericRecordAvroTypeInfo(schema)) > .addSink(streamingFileSink); > > env.execute("Read files in streaming fashion"); > } > > private static StreamExecutionEnvironment getStreamExecutionEnvironment( > MultipleParameterTool params) throws ClassNotFoundException { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.getConfig().setGlobalJobParameters(params); > > Class<?> unmodColl = > Class.forName("java.util.Collections$UnmodifiableCollection"); > env.getConfig() > .addDefaultKryoSerializer(unmodColl, > UnmodifiableCollectionsSerializer.class); > env.enableCheckpointing(60_000L); > > return env; > } > > > > The issue I am facing is failiing to serialize the Avro GenericRecord > wrapped message > > - When I used a GenericRecordAvroTypeInfo(schema); to force use my > Avro as preferred Serializer , I am getting the error below > > > * java.lang.ClassCastException: class <my fully qualified > POJO> cannot be cast to class org.apache.avro.generic.IndexedRecord* > > > > - If I don't use the GenericRecordAvroTypeInfo and try to register my > pojo with KryoSerializer , the serialization fails with NPE somewhere in my > Schema class.Do I need to implement/register a proper Avro serializer with > flink config? > > Thanks for the help! >