Github user sandeepdeshmukh commented on a diff in the pull request:
https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60871561
--- Diff:
contrib/src/main/java/com/datatorrent/contrib/enrich/POJOEnricher.java ---
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.PojoUtils;
+
+
+/**
+ * This class takes a POJO as input and extract the value of the lookupKey
configured
+ * for this operator. It then does a lookup in file/DB to find matching
entry and all key-value pairs
+ * specified in the file/DB or based on include fieldMap are added to
original tuple.
+ * This operator is App Builder schema support enabled. <br>
+ * <p>
+ * Properties:<br>
+ * <b>inputClass</b>: Class to be loaded for the incoming data type<br>
+ * <b>outputClass</b>: Class to be loaded for the emitted data type<br>
+ * <br>
+ * <p>
+ * Example
+ * The file contains data in json format, one entry per line. during setup
entire file is read and
+ * kept in memory for quick lookup.
+ * If file contains following lines, and operator is configured with
lookup key "productId"
+ * { "productId": 1, "productCategory": 3 }
+ * { "productId": 4, "productCategory": 10 }
+ * { "productId": 3, "productCategory": 1 }
+ * <p>
+ * And input tuple is
+ * { amount=10.0, channelId=4, productId=3 }
+ * <p>
+ * The tuple is modified as below before operator emits it on output port.
+ * { amount=10.0, channelId=4, productId=3, productCategory=1 }
+ *
+ * @displayName BeanEnrichment
+ * @category Database
+ * @tags enrichment, pojo, schema, lookup
+ */
[email protected]
+public class POJOEnricher extends AbstractEnricher<Object, Object>
+{
+ private static final Logger logger =
LoggerFactory.getLogger(POJOEnricher.class);
+
+ /**
+ * Helper fields
+ */
+ protected Class<?> inputClass;
+ protected Class<?> outputClass;
+ private transient Map<PojoUtils.Getter, PojoUtils.Setter> fieldMap = new
HashMap<>();
+ private transient List<PojoUtils.Setter> includeSetters = new
ArrayList<>();
+ private transient List<PojoUtils.Getter> lookupGetters = new
ArrayList<>();
+
+ @InputPortFieldAnnotation(schemaRequired = true)
+ public final transient DefaultInputPort<Object> input = new
DefaultInputPort<Object>()
+ {
+ @Override
+ public void setup(Context.PortContext context)
+ {
+ inputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+ }
+
+ @Override
+ public void process(Object object)
+ {
+ processTuple(object);
+ }
+ };
+
+ @OutputPortFieldAnnotation(schemaRequired = true)
+ public final transient DefaultOutputPort<Object> output = new
DefaultOutputPort<Object>()
+ {
+ @Override
+ public void setup(Context.PortContext context)
+ {
+ outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+ }
+ };
+
+ protected void processTuple(Object object)
+ {
+ enrichTuple(object);
+ }
+
+ @Override
+ protected Object getKey(Object tuple)
+ {
+ ArrayList<Object> keyList = new ArrayList<>();
+ for (PojoUtils.Getter lookupGetter : lookupGetters) {
+ keyList.add(lookupGetter.get(tuple));
+ }
+ return keyList;
+ }
+
+ @Override
+ protected Object convert(Object in, Object cached)
+ {
+ Object o;
+
+ try {
+ o = outputClass.newInstance();
+ } catch (InstantiationException | IllegalAccessException e) {
+ logger.error("Failed to create new instance of output POJO", e);
+ return null;
+ }
+
+ for (Map.Entry<PojoUtils.Getter, PojoUtils.Setter> entry :
fieldMap.entrySet()) {
+ entry.getValue().set(o, entry.getKey().get(in));
+ }
+
+ if (cached == null) {
+ return null;
+ }
+
+ ArrayList<Object> includeObjects = (ArrayList<Object>)cached;
+ int idx = 0;
+ for (PojoUtils.Setter includeSetter : includeSetters) {
+ try {
+ includeSetter.set(o, includeObjects.get(idx++));
+ } catch (RuntimeException e) {
+ logger.error("Failed to set the property. Continuing with
default.", e);
+ }
+ }
+
+ return o;
+ }
+
+ @Override
+ protected void emitTuple(Object tuple)
+ {
+ output.emit(tuple);
+ }
+
+ @Override
+ protected Class<?> getIncludeFieldType(String fieldName)
+ {
+ try {
+ return outputClass.getDeclaredField(fieldName).getType();
+ } catch (NoSuchFieldException e) {
+ logger.warn("Failed to find given fieldName, returning object type",
e);
+ return Object.class;
+ }
+ }
+
+ @Override
+ protected Class<?> getLookupFieldType(String fieldName)
+ {
+ try {
+ return inputClass.getDeclaredField(fieldName).getType();
+ } catch (NoSuchFieldException e) {
+ logger.warn("Failed to find given fieldName, returning object type",
e);
+ return Object.class;
+ }
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private PojoUtils.Setter generateSettersForField(Class<?> klass, String
outputFieldName)
+ throws NoSuchFieldException, SecurityException
+ {
+ Field f = outputClass.getDeclaredField(outputFieldName);
+ Class c = ClassUtils.primitiveToWrapper(f.getType());
+ PojoUtils.Setter classSetter = PojoUtils.createSetter(klass,
outputFieldName, c);
--- End diff --
return directly.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---