[
https://issues.apache.org/jira/browse/APEXMALHAR-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15264366#comment-15264366
]
ASF GitHub Bot commented on APEXMALHAR-2023:
--------------------------------------------
Github user chinmaykolhatkar commented on a diff in the pull request:
https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r61614306
--- Diff:
contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java ---
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+import org.codehaus.jackson.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.esotericsoftware.kryo.NotNull;
+import com.google.common.collect.Maps;
+import com.datatorrent.lib.util.FieldInfo;
+
+
+/**
+ * This implementation of {@link BackendLoader} loads the data from given
file, puts in memory cache and serves the
+ * queries from the same cache.
+ * When this becomes part of cache manager, it can call {@link
#loadInitialData()} periodically reload the file.
+ * <p>
+ * The format of the file looks like following:
+ * <p>
+ * {"productCategory": 5, "productId": 0}
+ * {"productCategory": 4, "productId": 1}
+ * {"productCategory": 5, "productId": 2}
+ * {"productCategory": 5, "productId": 3}
+ * </p>
+ * Each line in a file should be a valid json object which represent the
record and each key/value pair in that json object
+ * represents the fields/value.
+ * <p>
+ * NOTE: This loader should be used with caution as all the data present
in the file is loaded in memory.
+ */
[email protected]
+public class FSLoader extends ReadOnlyBackup
+{
+ @NotNull
+ private String fileName;
+
+ private transient Path filePath;
+ private transient FileSystem fs;
+ private transient boolean connected;
+
+ private static final ObjectMapper mapper = new ObjectMapper();
+ private static final ObjectReader reader = mapper.reader(new
TypeReference<Map<String, Object>>(){});
+ private static final Logger logger =
LoggerFactory.getLogger(FSLoader.class);
+
+ public String getFileName()
+ {
+ return fileName;
+ }
+
+ public void setFileName(String fileName)
+ {
+ this.fileName = fileName;
+ }
+
+ @Override
+ public Map<Object, Object> loadInitialData()
+ {
+ Map<Object, Object> result = null;
+ FSDataInputStream in = null;
+ BufferedReader bin = null;
+ try {
+ result = Maps.newHashMap();
+ in = fs.open(filePath);
+ bin = new BufferedReader(new InputStreamReader(in));
+ String line;
+ while ((line = bin.readLine()) != null) {
+ try {
+ Map<String, Object> tuple = reader.readValue(line);
+ result.put(getKey(tuple), getValue(tuple));
+ } catch (JsonProcessingException parseExp) {
+ logger.info("Unable to parse line {}", line);
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (bin != null) {
+ IOUtils.closeQuietly(bin);
+ }
+ if (in != null) {
+ IOUtils.closeQuietly(in);
+ }
+ try {
+ fs.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ logger.debug("loading initial data {}", result.size());
+ return result;
+ }
+
+ private Object getValue(Map<String, Object> tuple)
+ {
+ ArrayList<Object> includeTuple = new ArrayList<Object>();
+ for (FieldInfo s : includeFieldInfo) {
+ includeTuple.add(tuple.get(s.getColumnName()));
+ }
+ return includeTuple;
+ }
+
+ private Object getKey(Map<String, Object> tuple)
+ {
+ ArrayList<Object> lst = new ArrayList<Object>();
+ for (FieldInfo key : lookupFieldInfo) {
+ lst.add(tuple.get(key.getColumnName()));
+ }
+ return lst;
+ }
+
+ @Override
+ public Object get(Object key)
+ {
+ return null;
+ }
+
+ @Override
+ public List<Object> getAll(List<Object> keys)
+ {
+ return null;
+ }
+
+ @Override
+ public void connect() throws IOException
+ {
+ Configuration conf = new Configuration();
+ this.filePath = new Path(fileName);
+ this.fs = FileSystem.newInstance(filePath.toUri(), conf);
--- End diff --
Considering the size of the file, I think it might be just more efficient
to reload the file.
This FSLoader is to be used when size of the file is small even to be
loaded in memory.
> Adding Enrichment Operator to Malhar
> ------------------------------------
>
> Key: APEXMALHAR-2023
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2023
> Project: Apache Apex Malhar
> Issue Type: New Feature
> Components: adapters database
> Affects Versions: 3.3.1
> Reporter: Chinmay Kolhatkar
> Assignee: Chinmay Kolhatkar
>
> Add Enrichment Operator to Apex Malhar.
> Discussion is happening in mailing list here:
> http://mail-archives.apache.org/mod_mbox/incubator-apex-dev/201603.mbox/%3CCAKJfLDMo24-Gcvum2ZL8-0JOnE8QLryAy0Zu_R5zhMd_bsJyHw%40mail.gmail.com%3E
> Ponymail permalink:
> https://pony-poc.apache.org/thread.html/Z8t5ut5pu5vprgt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)