Re: Flink issues with Avro GenericRecord serialization

2021-08-31 Thread tarun joshi
This is resolved by the first approach I mentioned.

Thanks Team

On Mon, 30 Aug 2021 at 12:35, tarun joshi <1985.ta...@gmail.com> wrote:

> An update on this , I see that `IndexedRecord` is part of Avro Library.
> Please correct me If I am wrong in assuming that the "Pojo's generated by
> Avro POJO generator must be implementing IndexedRecord interface"  It seems
> either
>
>
>-  I should be parsing Stringified Json from AWS Kinesis directly into
>Avro.
>-  *Or* convert those GSON parsed POJOs into Avro compatible POJOs at
>Stream-time.
>
> Please let me know if anyone has a better way to do this.
>
>
> On Mon, 30 Aug 2021 at 10:13, tarun joshi <1985.ta...@gmail.com> wrote:
>
>> Hey all,
>> I am trying to write a simple pipeline to read
>>
>> Read Stringified JSON from Kinesis -> parsed to POJO -> converted to Avro
>> -> for the purpose of writing Parquet files to AWS S3.
>>
>> 1) This is my SimpleMapper
>>
>> public class SimpleMapper extends RichMapFunction {
>> private static final GsonBuilder gsonBuilder =
>> new 
>> GsonBuilder().excludeFieldsWithoutExposeAnnotation().setPrettyPrinting();
>>
>> private static final Gson gson = gsonBuilder.create();
>> private static final Schema schema = 
>> ReflectData.get().getSchema(Response.class);
>>
>> @Override
>> public GenericRecord map(String s) throws Exception {
>>
>> Response response = gson.fromJson(s, Response.class);
>> GenericData.Record record = new GenericData.Record(schema);
>> record.put(0, response);
>>
>> return record;
>> }
>>
>> 2) This is my Job Definition
>>
>> public class ClickStreamPipeline implements Serializable {
>>
>> private static Schema schema = 
>> ReflectData.get().getSchema(Response.class);
>>
>> public static void main(String args[]) throws Exception {
>> final MultipleParameterTool params = 
>> MultipleParameterTool.fromArgs(args);
>> StreamExecutionEnvironment env = 
>> getStreamExecutionEnvironment(params);
>>
>>
>> FlinkKinesisConsumer kinesisConsumer =
>> new FlinkKinesisConsumer<>(
>> "web-clickstream", new SimpleStringSchema(), 
>> getKafkaConsumerProperties());
>>
>> final StreamingFileSink streamingFileSink =
>> StreamingFileSink.forBulkFormat(
>> new 
>> Path("s3://data-ingestion-pipeline/flink_pipeline/"),
>> ParquetAvroWriters.forGenericRecord(schema))
>> .withRollingPolicy(OnCheckpointRollingPolicy.build())
>> .build();
>>
>> env.addSource(kinesisConsumer)
>> .map(new SimpleMapper())
>> .returns(new GenericRecordAvroTypeInfo(schema))
>> .addSink(streamingFileSink);
>>
>> env.execute("Read files in streaming fashion");
>> }
>>
>> private static StreamExecutionEnvironment getStreamExecutionEnvironment(
>> MultipleParameterTool params) throws ClassNotFoundException {
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.getConfig().setGlobalJobParameters(params);
>>
>> Class unmodColl = 
>> Class.forName("java.util.Collections$UnmodifiableCollection");
>> env.getConfig()
>> .addDefaultKryoSerializer(unmodColl, 
>> UnmodifiableCollectionsSerializer.class);
>> env.enableCheckpointing(60_000L);
>>
>> return env;
>> }
>>
>>
>>
>> The issue I am facing is failiing to serialize the Avro GenericRecord
>> wrapped message
>>
>>- When I used a GenericRecordAvroTypeInfo(schema); to force use my
>>Avro as preferred Serializer , I am getting the error below
>>
>>
>> *  java.lang.ClassCastException: class > POJO> cannot be cast to class org.apache.avro.generic.IndexedRecord*
>>
>>
>>
>>- If I don't use the GenericRecordAvroTypeInfo and try to register my
>>pojo with KryoSerializer , the serialization fails with NPE somewhere in 
>> my
>>Schema class.Do I need to implement/register a proper Avro serializer with
>>flink config?
>>
>> Thanks for the help!
>>
>


Re: Flink issues with Avro GenericRecord serialization

2021-08-30 Thread tarun joshi
An update on this , I see that `IndexedRecord` is part of Avro Library.
Please correct me If I am wrong in assuming that the "Pojo's generated by
Avro POJO generator must be implementing IndexedRecord interface"  It seems
either


   -  I should be parsing Stringified Json from AWS Kinesis directly into
   Avro.
   -  *Or* convert those GSON parsed POJOs into Avro compatible POJOs at
   Stream-time.

Please let me know if anyone has a better way to do this.


On Mon, 30 Aug 2021 at 10:13, tarun joshi <1985.ta...@gmail.com> wrote:

> Hey all,
> I am trying to write a simple pipeline to read
>
> Read Stringified JSON from Kinesis -> parsed to POJO -> converted to Avro
> -> for the purpose of writing Parquet files to AWS S3.
>
> 1) This is my SimpleMapper
>
> public class SimpleMapper extends RichMapFunction {
> private static final GsonBuilder gsonBuilder =
> new 
> GsonBuilder().excludeFieldsWithoutExposeAnnotation().setPrettyPrinting();
>
> private static final Gson gson = gsonBuilder.create();
> private static final Schema schema = 
> ReflectData.get().getSchema(Response.class);
>
> @Override
> public GenericRecord map(String s) throws Exception {
>
> Response response = gson.fromJson(s, Response.class);
> GenericData.Record record = new GenericData.Record(schema);
> record.put(0, response);
>
> return record;
> }
>
> 2) This is my Job Definition
>
> public class ClickStreamPipeline implements Serializable {
>
> private static Schema schema = 
> ReflectData.get().getSchema(Response.class);
>
> public static void main(String args[]) throws Exception {
> final MultipleParameterTool params = 
> MultipleParameterTool.fromArgs(args);
> StreamExecutionEnvironment env = 
> getStreamExecutionEnvironment(params);
>
>
> FlinkKinesisConsumer kinesisConsumer =
> new FlinkKinesisConsumer<>(
> "web-clickstream", new SimpleStringSchema(), 
> getKafkaConsumerProperties());
>
> final StreamingFileSink streamingFileSink =
> StreamingFileSink.forBulkFormat(
> new 
> Path("s3://data-ingestion-pipeline/flink_pipeline/"),
> ParquetAvroWriters.forGenericRecord(schema))
> .withRollingPolicy(OnCheckpointRollingPolicy.build())
> .build();
>
> env.addSource(kinesisConsumer)
> .map(new SimpleMapper())
> .returns(new GenericRecordAvroTypeInfo(schema))
> .addSink(streamingFileSink);
>
> env.execute("Read files in streaming fashion");
> }
>
> private static StreamExecutionEnvironment getStreamExecutionEnvironment(
> MultipleParameterTool params) throws ClassNotFoundException {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().setGlobalJobParameters(params);
>
> Class unmodColl = 
> Class.forName("java.util.Collections$UnmodifiableCollection");
> env.getConfig()
> .addDefaultKryoSerializer(unmodColl, 
> UnmodifiableCollectionsSerializer.class);
> env.enableCheckpointing(60_000L);
>
> return env;
> }
>
>
>
> The issue I am facing is failiing to serialize the Avro GenericRecord
> wrapped message
>
>- When I used a GenericRecordAvroTypeInfo(schema); to force use my
>Avro as preferred Serializer , I am getting the error below
>
>
> *  java.lang.ClassCastException: class  POJO> cannot be cast to class org.apache.avro.generic.IndexedRecord*
>
>
>
>- If I don't use the GenericRecordAvroTypeInfo and try to register my
>pojo with KryoSerializer , the serialization fails with NPE somewhere in my
>Schema class.Do I need to implement/register a proper Avro serializer with
>flink config?
>
> Thanks for the help!
>


Flink issues with Avro GenericRecord serialization

2021-08-30 Thread tarun joshi
Hey all,
I am trying to write a simple pipeline to read

Read Stringified JSON from Kinesis -> parsed to POJO -> converted to Avro
-> for the purpose of writing Parquet files to AWS S3.

1) This is my SimpleMapper

public class SimpleMapper extends RichMapFunction {
private static final GsonBuilder gsonBuilder =
new
GsonBuilder().excludeFieldsWithoutExposeAnnotation().setPrettyPrinting();

private static final Gson gson = gsonBuilder.create();
private static final Schema schema =
ReflectData.get().getSchema(Response.class);

@Override
public GenericRecord map(String s) throws Exception {

Response response = gson.fromJson(s, Response.class);
GenericData.Record record = new GenericData.Record(schema);
record.put(0, response);

return record;
}

2) This is my Job Definition

public class ClickStreamPipeline implements Serializable {

private static Schema schema = ReflectData.get().getSchema(Response.class);

public static void main(String args[]) throws Exception {
final MultipleParameterTool params =
MultipleParameterTool.fromArgs(args);
StreamExecutionEnvironment env = getStreamExecutionEnvironment(params);


FlinkKinesisConsumer kinesisConsumer =
new FlinkKinesisConsumer<>(
"web-clickstream", new SimpleStringSchema(),
getKafkaConsumerProperties());

final StreamingFileSink streamingFileSink =
StreamingFileSink.forBulkFormat(
new
Path("s3://data-ingestion-pipeline/flink_pipeline/"),
ParquetAvroWriters.forGenericRecord(schema))
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();

env.addSource(kinesisConsumer)
.map(new SimpleMapper())
.returns(new GenericRecordAvroTypeInfo(schema))
.addSink(streamingFileSink);

env.execute("Read files in streaming fashion");
}

private static StreamExecutionEnvironment getStreamExecutionEnvironment(
MultipleParameterTool params) throws ClassNotFoundException {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);

Class unmodColl =
Class.forName("java.util.Collections$UnmodifiableCollection");
env.getConfig()
.addDefaultKryoSerializer(unmodColl,
UnmodifiableCollectionsSerializer.class);
env.enableCheckpointing(60_000L);

return env;
}



The issue I am facing is failiing to serialize the Avro GenericRecord
wrapped message

   - When I used a GenericRecordAvroTypeInfo(schema); to force use my Avro
   as preferred Serializer , I am getting the error below


*  java.lang.ClassCastException: class  cannot be cast to class org.apache.avro.generic.IndexedRecord*



   - If I don't use the GenericRecordAvroTypeInfo and try to register my
   pojo with KryoSerializer , the serialization fails with NPE somewhere in my
   Schema class.Do I need to implement/register a proper Avro serializer with
   flink config?

Thanks for the help!