Hi Khachatryan,

This is the use case to create multiple streams:

I have a use case where multiple types of Avro records are coming in single
Kafka topic as we are suing TopicRecordNameStrategy for the subject in the
schema registry. Now I have written a consumer to read that topic and build
a Datastream of GenericRecord. Now I can not sink this stream to hdfs/s3 in
parquet format as this stream contains different types of schema records.
So I am filtering different records for each type by applying a filter and
creating different streams and then sinking each stream separately.

So can you please help me create multiple dynamic streams with the code
that I shared. How to resolve this issue?

On Tue, Feb 25, 2020 at 10:46 PM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> As I understand from code, streamMap is a Java map, not Scala. So you can
> get NPE while unreferencing the value you got from it.
>
> Also, the approach looks a bit strange.
> Can you describe what are you trying to achieve?
>
> Regards,
> Roman
>
>
> On Mon, Feb 24, 2020 at 5:47 PM aj <ajainje...@gmail.com> wrote:
>
>>
>> I am trying below piece of code to create multiple datastreams object and
>> store in map.
>>
>> for (EventConfig eventConfig : eventTypesList) {
>>             LOGGER.info("creating a stream for ",
>> eventConfig.getEvent_name());
>>             String key = eventConfig.getEvent_name();
>>             final StreamingFileSink<GenericRecord> sink =
>> StreamingFileSink.forBulkFormat
>>                     (path,
>> ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject())))
>>                     .withBucketAssigner(new EventTimeBucketAssigner())
>>                     .build();
>>
>>             DataStream<GenericRecord> stream =
>> dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
>>                 if
>> (genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name()))
>> {
>>                     return true;
>>                 }
>>                 return false;
>>             });
>>
>>             Tuple2<DataStream<GenericRecord>,
>> StreamingFileSink<GenericRecord>> tuple2 = new Tuple2<>(stream, sink);
>>             streamMap.put(key, tuple2);
>>         }
>>
>>         DataStream<GenericRecord> searchStream =
>> streamMap.get(SEARCH_LIST_KEYLESS).getField(0);
>>         searchStream.map(new
>> Enricher()).addSink(streamMap.get(SEARCH_LIST_KEYLESS).getField(1));
>>
>>
>> I am getting Nullpointer Exception when trying to get the stream from map
>> value at :
>>
>>
>> *DataStream<GenericRecord> searchStream =
>> streamMap.get(SEARCH_LIST_KEYLESS).getField(0);*
>>
>> As per my understanding, this is due to the map is local to main and not
>> broadcast to tasks.
>> If I want to do this how should I do, please help me to resolve this?
>>
>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>>
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Reply via email to