[
https://issues.apache.org/jira/browse/APEXMALHAR-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15247551#comment-15247551
]
ASF GitHub Bot commented on APEXMALHAR-2023:
--------------------------------------------
Github user sandeepdeshmukh commented on a diff in the pull request:
https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60210490
--- Diff:
contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java ---
@@ -0,0 +1,319 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.NotNull;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Operator;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.db.cache.CacheManager;
+import com.datatorrent.lib.db.cache.CacheStore;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.FieldInfo.SupportType;
+
+/**
+ * Base class for Enrichment Operator. Subclasses should provide
implementation to getKey and convert.
+ * The operator receives a tuple and emits enriched tuple based on
includeFields and lookupFields. <br/>
+ * <p>
+ * Properties:<br>
+ * <b>lookupFields</b>: List of comma separated keys for quick searching.
Ex: Field1,Field2,Field3<br>
+ * <b>includeFields</b>: List of comma separated fields to be
replaced/added to the input tuple. Ex: Field1,Field2,Field3<br>
+ * <b>store</b>: Specify the type of loader for looking data<br>
+ * <br>
+ *
+ * @param <INPUT> Type of tuples which are received by this operator</T>
+ * @param <OUTPUT> Type of tuples which are emitted by this operator</T>
+ * @displayName Abstract Enrichment Operator
+ * @tags Enrichment
+ */
[email protected]
+public abstract class AbstractEnricher<INPUT, OUTPUT> extends BaseOperator
implements Operator.ActivationListener
+{
+ /**
+ * Mandatory parameters for Enricher
+ */
+ @NotNull
+ protected List<String> lookupFields;
+ @NotNull
+ protected List<String> includeFields;
+ @NotNull
+ private BackendLoader store;
+
+ /**
+ * Optional parameters for enricher.
+ */
+ private int cacheExpirationInterval = 24 * 60 * 60 * 1000;
+ private int cacheCleanupInterval = 24 * 60 * 60 * 1000;
+ private int cacheSize = 1024;
+
+ /**
+ * Helper variables.
+ */
+ private transient CacheManager cacheManager;
+ protected transient List<FieldInfo> lookupFieldInfo = new ArrayList<>();
+ protected transient List<FieldInfo> includeFieldInfo = new ArrayList<>();
+
+ /**
+ * This method needs to be called by implementing class for processing a
tuple for enrichment.
+ * The method will take th tuple through following stages:
+ * <ol>
+ * <li>Call {@link #getKey(Object)} to retrieve key fields for
lookup</li>
+ * <li>Using key fields call cache manager to retrieve for any key that
is cached already</li>
+ * <li>If not found in cache, it'll do a lookup in configured backend
store</li>
+ * <li>The retrieved fields will be passed to {@link #convert(Object,
Object)} method to create the final object</li>
+ * <li>Finally {@link #emitTuple(Object)} is called for emitting the
tuple</li>
+ * </ol>
+ *
+ * @param tuple Input tuple that needs to get processed for enrichment.
+ */
+ protected void enrichTuple(INPUT tuple)
+ {
+ Object key = getKey(tuple);
+ if (key != null) {
+ Object result = cacheManager.get(key);
+ OUTPUT out = convert(tuple, result);
+ emitTuple(out);
+ }
+ }
+
+ /**
+ * The method should be implemented by concrete class which returns an
ArrayList<Object> containing all the fields
+ * which forms key part of lookup.
+ * The order of field values should be same as the one set in {@link
#lookupFields} variable.
+ *
+ * @param tuple Input tuple from which fields values for key needs to be
fetched.
+ * @return Should return ArrayList<Object> which has fields values
forming keys in same order as {@link #lookupFields}
+ */
+ protected abstract Object getKey(INPUT tuple);
+
+ /**
+ * The method should be implemented by concrete class.
+ * This method is expected to take input tuple and a externally fetched
object contained field to be enriched, and
+ * return a Enriched tuple which is ready to be emitted.
+ *
+ * @param in Input tuple which needs to be enriched.
+ * @param cached ArrayList<Object> containing missing data retrieved
from external sources.
+ * @return Enriched tuple of type OUTPUT
+ */
+ protected abstract OUTPUT convert(INPUT in, Object cached);
+
+ /**
+ * This method should be implemented by concrete class.
+ * The method is expected to emit tuple of type OUTPUT
+ *
+ * @param tuple Tuple of type OUTPUT that should be emitted.
+ */
+ protected abstract void emitTuple(OUTPUT tuple);
+
+ /**
+ * This method should be implemented by concrete method.
+ * The method should return Class type of field for given fieldName from
output tuple.
+ *
+ * @param fieldName Field name for which field type needs to be
identified
+ * @return Class type for given field.
+ */
+ protected abstract Class<?> getIncludeFieldType(String fieldName);
+
+ /**
+ * This method should be implemented by concrete method.
+ * The method should return Class type of field for given fieldName from
input tuple.
+ *
+ * @param fieldName Field name for which field type needs to be
identified
+ * @return Class type for given field.
+ */
+ protected abstract Class<?> getLookupFieldType(String fieldName);
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+
+ cacheManager = new NullValuesCacheManager();
+ CacheStore primaryCache = new CacheStore();
+
+ // set expiration to one day.
+ primaryCache.setEntryExpiryDurationInMillis(cacheExpirationInterval);
+ primaryCache.setCacheCleanupInMillis(cacheCleanupInterval);
+
primaryCache.setEntryExpiryStrategy(CacheStore.ExpiryType.EXPIRE_AFTER_WRITE);
+ primaryCache.setMaxCacheSize(cacheSize);
+
+ cacheManager.setPrimary(primaryCache);
+ cacheManager.setBackup(store);
+ }
+
+ @Override
+ public void activate(Context context)
+ {
+ for (String s : lookupFields) {
+ lookupFieldInfo.add(new FieldInfo(s, s,
SupportType.getFromJavaType(getLookupFieldType(s))));
+ }
+
+ if (includeFields != null) {
+ for (String s : includeFields) {
+ includeFieldInfo.add(new FieldInfo(s, s,
SupportType.getFromJavaType(getIncludeFieldType(s))));
+ }
+ }
+
+ store.setFieldInfo(lookupFieldInfo, includeFieldInfo);
+
+ try {
+ cacheManager.initialize();
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to initialize primary cache", e);
+ }
+ }
+
+ @Override
+ public void deactivate()
+ {
--- End diff --
Close any resources, like db connection for the cacheManager
> Adding Enrichment Operator to Malhar
> ------------------------------------
>
> Key: APEXMALHAR-2023
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2023
> Project: Apache Apex Malhar
> Issue Type: New Feature
> Components: adapters database
> Affects Versions: 3.3.1
> Reporter: Chinmay Kolhatkar
> Assignee: Chinmay Kolhatkar
>
> Add Enrichment Operator to Apex Malhar.
> Discussion is happening in mailing list here:
> http://mail-archives.apache.org/mod_mbox/incubator-apex-dev/201603.mbox/%3CCAKJfLDMo24-Gcvum2ZL8-0JOnE8QLryAy0Zu_R5zhMd_bsJyHw%40mail.gmail.com%3E
> Ponymail permalink:
> https://pony-poc.apache.org/thread.html/Z8t5ut5pu5vprgt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)