Hello Community,

We want to add Enrichment operator to malhar library.

Here are some initial details about it:

UseCase:
=================
Data enrichment is an extremely common and important step in almost ALL
batch and stream processing flows.
Streaming use cases deal with log data which often lacks context and
metadata. The metadata is required for all additional analytical processing
This operator allows one to enrich stream data with data from external
source.

Functionality:
=================
1. Take input as POJO and emit enriched POJO as per the configuration.
2. The external store can be configurable and will be a plugin model.
3. Currently support for JDBC, Hbase and File based format store will be
added.
4. Operator will perform a reference lookup to these external databases to
enrich the incoming tuple.

Design:
=================
1. As mentioned above stores (viz. Database Loaders) will be plugin based
machanism.
2. To make the loaders pluggable they'll follow a common interface as
follows:
     public interface DBLoader extends
com.datatorrent.lib.db.cache.CacheManager.Backup
     {
         public void setFields(List<String> lookupFields,List<String>
includeFields);
         public void setFieldInfo(List<FieldInfo> fieldInfos)
     }
3. All the above mentioned loaders (JDBC, Hbase, file etc) will implement
above interface and Enrichment Operator will use object of this interface
to query missing fields to be enriched.
4. Both input and output ports of Enrichment Operator will need to be set
with TUPLE_CLASS Attribute for the operator to know of upstream and
downstream.
This means, input schema can be seperate from output schema.
5. Enrichment operator will use PojoUtils to create getters and setters
which will be used to dynamically generate the new enriched object on the
fly.
6. User need to configure enrichmentMap for any change in
columnName/inputFieldName to outputField.

Please provide your valuable feedback on above.

Thanks in advance,
Chinmay.

Reply via email to