Hi Khachatryan, This is the use case to create multiple streams:
I have a use case where multiple types of Avro records are coming in single Kafka topic as we are suing TopicRecordNameStrategy for the subject in the schema registry. Now I have written a consumer to read that topic and build a Datastream of GenericRecord. Now I can not sink this stream to hdfs/s3 in parquet format as this stream contains different types of schema records. So I am filtering different records for each type by applying a filter and creating different streams and then sinking each stream separately. So can you please help me create multiple dynamic streams with the code that I shared. How to resolve this issue? On Tue, Feb 25, 2020 at 10:46 PM Khachatryan Roman < khachatryan.ro...@gmail.com> wrote: > As I understand from code, streamMap is a Java map, not Scala. So you can > get NPE while unreferencing the value you got from it. > > Also, the approach looks a bit strange. > Can you describe what are you trying to achieve? > > Regards, > Roman > > > On Mon, Feb 24, 2020 at 5:47 PM aj <ajainje...@gmail.com> wrote: > >> >> I am trying below piece of code to create multiple datastreams object and >> store in map. >> >> for (EventConfig eventConfig : eventTypesList) { >> LOGGER.info("creating a stream for ", >> eventConfig.getEvent_name()); >> String key = eventConfig.getEvent_name(); >> final StreamingFileSink<GenericRecord> sink = >> StreamingFileSink.forBulkFormat >> (path, >> ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject()))) >> .withBucketAssigner(new EventTimeBucketAssigner()) >> .build(); >> >> DataStream<GenericRecord> stream = >> dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> { >> if >> (genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) >> { >> return true; >> } >> return false; >> }); >> >> Tuple2<DataStream<GenericRecord>, >> StreamingFileSink<GenericRecord>> tuple2 = new Tuple2<>(stream, sink); >> streamMap.put(key, tuple2); >> } >> >> DataStream<GenericRecord> searchStream = >> streamMap.get(SEARCH_LIST_KEYLESS).getField(0); >> searchStream.map(new >> Enricher()).addSink(streamMap.get(SEARCH_LIST_KEYLESS).getField(1)); >> >> >> I am getting Nullpointer Exception when trying to get the stream from map >> value at : >> >> >> *DataStream<GenericRecord> searchStream = >> streamMap.get(SEARCH_LIST_KEYLESS).getField(0);* >> >> As per my understanding, this is due to the map is local to main and not >> broadcast to tasks. >> If I want to do this how should I do, please help me to resolve this? >> >> >> >> -- >> Thanks & Regards, >> Anuj Jain >> >> >> >> <http://www.cse.iitm.ac.in/%7Eanujjain/> >> > -- Thanks & Regards, Anuj Jain Mob. : +91- 8588817877 Skype : anuj.jain07 <http://www.oracle.com/> <http://www.cse.iitm.ac.in/%7Eanujjain/>