Repository: nifi
Updated Branches:
  refs/heads/master 59d3c6419 -> bfe92b900


NIFI-3970: Add CSVRecordLookupService


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/63840377
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/63840377
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/63840377

Branch: refs/heads/master
Commit: 63840377dda653146884a96b7955092f24f34dea
Parents: 59d3c64
Author: Matthew Burgess <mattyb...@apache.org>
Authored: Wed Sep 6 13:01:22 2017 -0400
Committer: Matthew Burgess <mattyb...@apache.org>
Committed: Mon Dec 11 19:14:27 2017 -0500

----------------------------------------------------------------------
 .../nifi/lookup/CSVRecordLookupService.java     | 232 +++++++++++++++++++
 ...org.apache.nifi.controller.ControllerService |   1 +
 .../nifi/lookup/TestCSVRecordLookupService.java |  69 ++++++
 3 files changed, 302 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/63840377/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/CSVRecordLookupService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/CSVRecordLookupService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/CSVRecordLookupService.java
new file mode 100644
index 0000000..fdec5f2
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/CSVRecordLookupService.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.file.monitor.LastModifiedMonitor;
+import org.apache.nifi.util.file.monitor.SynchronousFileWatcher;
+
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@Tags({"lookup", "cache", "enrich", "join", "csv", "reloadable", "key", 
"value", "record"})
+@CapabilityDescription("A reloadable CSV file-based lookup service. When the 
lookup key is found in the CSV file, the remaining columns are returned as a 
Record.")
+public class CSVRecordLookupService extends AbstractControllerService 
implements RecordLookupService {
+
+    private static final String KEY = "key";
+
+    private static final Set<String> REQUIRED_KEYS = 
Collections.unmodifiableSet(Stream.of(KEY).collect(Collectors.toSet()));
+
+    public static final PropertyDescriptor CSV_FILE =
+            new PropertyDescriptor.Builder()
+                    .name("csv-file")
+                    .displayName("CSV File")
+                    .description("A CSV file.")
+                    .required(true)
+                    .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+                    .expressionLanguageSupported(true)
+                    .build();
+
+    static final PropertyDescriptor CSV_FORMAT = new 
PropertyDescriptor.Builder()
+            .name("CSV Format")
+            .description("Specifies which \"format\" the CSV data is in, or 
specifies if custom formatting should be used.")
+            .expressionLanguageSupported(false)
+            
.allowableValues(Arrays.asList(CSVFormat.Predefined.values()).stream().map(e -> 
e.toString()).collect(Collectors.toSet()))
+            .defaultValue(CSVFormat.Predefined.Default.toString())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor LOOKUP_KEY_COLUMN =
+            new PropertyDescriptor.Builder()
+                    .name("lookup-key-column")
+                    .displayName("Lookup Key Column")
+                    .description("Lookup key column.")
+                    .required(true)
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .expressionLanguageSupported(true)
+                    .build();
+
+    public static final PropertyDescriptor IGNORE_DUPLICATES =
+            new PropertyDescriptor.Builder()
+                    .name("ignore-duplicates")
+                    .displayName("Ignore Duplicates")
+                    .description("Ignore duplicate keys for records in the CSV 
file.")
+                    .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+                    .allowableValues("true", "false")
+                    .defaultValue("true")
+                    .required(true)
+                    .build();
+
+    private List<PropertyDescriptor> properties;
+
+    private volatile ConcurrentMap<String, Map<String, Object>> cache;
+
+    private volatile String csvFile;
+
+    private volatile CSVFormat csvFormat;
+
+    private volatile String lookupKeyColumn;
+
+    private volatile boolean ignoreDuplicates;
+
+    private volatile SynchronousFileWatcher watcher;
+
+    private final ReentrantLock lock = new ReentrantLock();
+
+    private void loadCache() throws IllegalStateException, IOException {
+        if (lock.tryLock()) {
+            try {
+                final ComponentLog logger = getLogger();
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Loading lookup table from file: " + csvFile);
+                }
+
+                final FileReader reader = new FileReader(csvFile);
+                final CSVParser records = 
csvFormat.withFirstRecordAsHeader().parse(reader);
+                this.cache = new ConcurrentHashMap<>();
+                for (final CSVRecord record : records) {
+                    final String key = record.get(lookupKeyColumn);
+
+                    if (StringUtils.isBlank(key)) {
+                        throw new IllegalStateException("Empty lookup key 
encountered in: " + csvFile);
+                    } else if (!ignoreDuplicates && 
this.cache.containsKey(key)) {
+                        throw new IllegalStateException("Duplicate lookup key 
encountered: " + key + " in " + csvFile);
+                    } else if (ignoreDuplicates && 
this.cache.containsKey(key)) {
+                        logger.warn("Duplicate lookup key encountered: {} in 
{}", new Object[]{key, csvFile});
+                    }
+
+                    // Put each key/value pair (except the lookup) into the 
properties
+                    final Map<String, Object> properties = new HashMap<>();
+                    record.toMap().forEach((k, v) -> {
+                        if (!lookupKeyColumn.equals(k)) {
+                            properties.put(k, v);
+                        }
+                    });
+                    cache.put(key, properties);
+                }
+
+                if (cache.isEmpty()) {
+                    logger.warn("Lookup table is empty after reading file: " + 
csvFile);
+                }
+            } finally {
+                lock.unlock();
+            }
+        }
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    protected void init(final ControllerServiceInitializationContext context) 
throws InitializationException {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(CSV_FILE);
+        properties.add(CSV_FORMAT);
+        properties.add(LOOKUP_KEY_COLUMN);
+        properties.add(IGNORE_DUPLICATES);
+        this.properties = Collections.unmodifiableList(properties);
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) throws 
InitializationException, IOException, FileNotFoundException {
+        this.csvFile = context.getProperty(CSV_FILE).getValue();
+        this.csvFormat = 
CSVFormat.Predefined.valueOf(context.getProperty(CSV_FORMAT).getValue()).getFormat();
+        this.lookupKeyColumn = 
context.getProperty(LOOKUP_KEY_COLUMN).getValue();
+        this.ignoreDuplicates = 
context.getProperty(IGNORE_DUPLICATES).asBoolean();
+        this.watcher = new SynchronousFileWatcher(Paths.get(csvFile), new 
LastModifiedMonitor(), 30000L);
+        try {
+            loadCache();
+        } catch (final IllegalStateException e) {
+            throw new InitializationException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public Optional<Record> lookup(final Map<String, String> coordinates) 
throws LookupFailureException {
+        if (coordinates == null) {
+            return Optional.empty();
+        }
+
+        final String key = coordinates.get(KEY);
+        if (StringUtils.isBlank(key)) {
+            return Optional.empty();
+        }
+
+        try {
+            if (watcher != null && watcher.checkAndReset()) {
+                loadCache();
+            }
+        } catch (final IllegalStateException | IOException e) {
+            throw new LookupFailureException(e.getMessage(), e);
+        }
+
+        final Record lookupRecord;
+        Map<String, Object> recordMap = cache.get(key);
+        if (recordMap != null) {
+            List<RecordField> recordFields = new ArrayList<>(recordMap.size());
+            recordMap.forEach((k, v) -> recordFields.add(new RecordField(k, 
RecordFieldType.STRING.getDataType())));
+            final RecordSchema lookupRecordSchema = new 
SimpleRecordSchema(recordFields);
+            lookupRecord = new MapRecord(lookupRecordSchema, recordMap);
+        } else {
+            lookupRecord = null;
+        }
+
+        return Optional.ofNullable(lookupRecord);
+    }
+
+    @Override
+    public Set<String> getRequiredKeys() {
+        return REQUIRED_KEYS;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/63840377/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index ceaefa8..544ec91 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 org.apache.nifi.lookup.maxmind.IPLookupService
+org.apache.nifi.lookup.CSVRecordLookupService
 org.apache.nifi.lookup.PropertiesFileLookupService
 org.apache.nifi.lookup.SimpleKeyValueLookupService
 org.apache.nifi.lookup.SimpleCsvFileLookupService

http://git-wip-us.apache.org/repos/asf/nifi/blob/63840377/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestCSVRecordLookupService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestCSVRecordLookupService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestCSVRecordLookupService.java
new file mode 100644
index 0000000..8d3f918
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestCSVRecordLookupService.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Optional;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+public class TestCSVRecordLookupService {
+
+    private final static Optional<Record> EMPTY_RECORD = Optional.empty();
+
+    @Test
+    public void testSimpleCsvFileLookupService() throws 
InitializationException, IOException, LookupFailureException {
+        final TestRunner runner = 
TestRunners.newTestRunner(TestProcessor.class);
+        final CSVRecordLookupService service = new CSVRecordLookupService();
+
+        runner.addControllerService("csv-file-lookup-service", service);
+        runner.setProperty(service, CSVRecordLookupService.CSV_FILE, 
"src/test/resources/test.csv");
+        runner.setProperty(service, CSVRecordLookupService.CSV_FORMAT, 
"RFC4180");
+        runner.setProperty(service, CSVRecordLookupService.LOOKUP_KEY_COLUMN, 
"key");
+        runner.enableControllerService(service);
+        runner.assertValid(service);
+
+        final CSVRecordLookupService lookupService =
+                (CSVRecordLookupService) runner.getProcessContext()
+                        .getControllerServiceLookup()
+                        .getControllerService("csv-file-lookup-service");
+
+        assertThat(lookupService, instanceOf(LookupService.class));
+
+        final Optional<Record> property1 = 
lookupService.lookup(Collections.singletonMap("key", "property.1"));
+        assertEquals("this is property 1", 
property1.get().getAsString("value"));
+        assertEquals("2017-04-01", property1.get().getAsString("created_at"));
+
+        final Optional<Record> property2 = 
lookupService.lookup(Collections.singletonMap("key", "property.2"));
+        assertEquals("this is property 2", 
property2.get().getAsString("value"));
+        assertEquals("2017-04-02", property2.get().getAsString("created_at"));
+
+        final Optional<Record> property3 = 
lookupService.lookup(Collections.singletonMap("key", "property.3"));
+        assertEquals(EMPTY_RECORD, property3);
+    }
+
+}

Reply via email to