[ https://issues.apache.org/jira/browse/NIFI-5051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16456445#comment-16456445 ]
ASF GitHub Bot commented on NIFI-5051: -------------------------------------- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2615#discussion_r184689786 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java --- @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.elasticsearch; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.lookup.LookupFailureException; +import org.apache.nifi.lookup.LookupService; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; +import org.apache.nifi.serialization.record.type.RecordDataType; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +public class ElasticSearchLookupService extends AbstractControllerService implements LookupService { + public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder() + .name("el-rest-client-service") + .displayName("Client Service") + .description("An ElasticSearch client service to use for running queries.") + .identifiesControllerService(ElasticSearchClientService.class) + .required(true) + .build(); + public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder() + .name("el-lookup-index") + .displayName("Index") + .description("The name of the index to read from") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder() + .name("el-lookup-type") + .displayName("Type") + .description("The type of this document (used by Elasticsearch for indexing and searching)") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder() + .name("el-lookup-schema-registry") + .displayName("Schema Registry") + .description("If specified, this avro schema will be used for all objects loaded from MongoDB using this service. If left blank, " + + "the service will attempt to determine the schema from the results.") + .required(false) + .identifiesControllerService(SchemaRegistry.class) + .build(); + + public static final PropertyDescriptor RECORD_SCHEMA_NAME = new PropertyDescriptor.Builder() + .name("el-lookup-record-schema-name") + .displayName("Record Schema Name") + .description("If specified, the value will be used to lookup a schema in the configured schema registry.") + .required(false) + .addValidator(Validator.VALID) + .build(); + + static final List<PropertyDescriptor> lookupDescriptors; + + static { + List<PropertyDescriptor> _desc = new ArrayList<>(); + _desc.add(CLIENT_SERVICE); + _desc.add(INDEX); + _desc.add(TYPE); + _desc.add(SCHEMA_REGISTRY); + _desc.add(RECORD_SCHEMA_NAME); + + lookupDescriptors = Collections.unmodifiableList(_desc); + } + + private ElasticSearchClientService clientService; + + private String index; + private String type; + private ObjectMapper mapper; + private RecordSchema recordSchema; + + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { + List<ValidationResult> problems = new ArrayList<>(); + + PropertyValue registry = validationContext.getProperty(SCHEMA_REGISTRY); + PropertyValue schemaName = validationContext.getProperty(RECORD_SCHEMA_NAME); + + if (registry.isSet() && !schemaName.isSet()) { + problems.add(new ValidationResult.Builder() + .explanation("If the registry is set, the schema name parameter must be set too.") + .build()); + } else if (!registry.isSet() && schemaName.isSet()) { + problems.add(new ValidationResult.Builder() + .explanation("If the schema name is set, the schema registry parameter must be set too.") + .build()); + } + + return problems; + } + + + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws InitializationException { + clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class); + index = context.getProperty(INDEX).getValue(); + type = context.getProperty(TYPE).getValue(); + mapper = new ObjectMapper(); + + SchemaRegistry registry = context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class); + final String name = context.getProperty(RECORD_SCHEMA_NAME).getValue(); + if (registry != null) { + try { + SchemaIdentifier identifier = SchemaIdentifier.builder() + .name(name) + .build(); + recordSchema = registry.retrieveSchema(identifier); + } catch (Exception ex) { + getLogger().error(String.format("Could not find schema named %s", name), ex); + throw new InitializationException(ex); + } + } + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return lookupDescriptors; + } + + @Override + public Optional lookup(Map coordinates) throws LookupFailureException { + validateCoordinates(coordinates); + + try { + Record record; + if (coordinates.containsKey("_id")) { + record = getById((String)coordinates.get("_id")); + } else { + record = getByQuery((String)coordinates.get("query")); + } + + return record == null ? Optional.empty() : Optional.of(record); + } catch (IOException ex) { + getLogger().error("Error during lookup.", ex); + throw new LookupFailureException(ex); + } + } + + private void validateCoordinates(Map coordinates) throws LookupFailureException { + List<String> reasons = new ArrayList<>(); + + if (coordinates.containsKey("_id") && !(coordinates.get("_id") instanceof String)) { + reasons.add("_id was supplied, but it was not a String."); + } else if (coordinates.containsKey("query") && !(coordinates.get("query") instanceof String)) { + reasons.add("query was supplied, but it was not a String."); + } else if (!coordinates.containsKey("_id") && !coordinates.containsKey("query")) { + reasons.add("Either \"_id\" or \"query\" must be supplied as keys to lookup(Map)"); + } else if (coordinates.containsKey("_id") && coordinates.containsKey("query")) { + reasons.add("\"_id\" and \"query\" cannot be used at the same time as keys."); + } + + if (reasons.size() > 0) { + String error = String.join("\n", reasons); + throw new LookupFailureException(error); + } + } + + private Record getById(final String _id) throws IOException, LookupFailureException { + Map<String, Object> query = new HashMap<String, Object>(){{ + put("query", new HashMap<String, Object>() {{ + put("match", new HashMap<String, String>(){{ + put("_id", _id); + }}); + }}); + }}; + + String json = mapper.writeValueAsString(query); + + SearchResponse response = clientService.search(json, index, type); + + if (response.getNumberOfHits() > 1) { + throw new LookupFailureException(String.format("Expected 1 response, got %d for query %s", + response.getNumberOfHits(), json)); + } else if (response.getNumberOfHits() == 0) { + return null; + } + + final Map<String, Object> source = (Map)response.getHits().get(0).get("_source"); + + RecordSchema toUse = recordSchema != null ? recordSchema : convertSchema(source); + + return new MapRecord(toUse, source); + } + + private Record getByQuery(final String query) throws LookupFailureException { + Map<String, Object> parsed; + try { + parsed = mapper.readValue(query, Map.class); + parsed.remove("from"); + parsed.remove("aggs"); + parsed.put("size", 1); + + final String json = mapper.writeValueAsString(parsed); + + SearchResponse response = clientService.search(json, index, type); + + if (response.getNumberOfHits() == 0) { + return null; + } else { + final Map<String, Object> source = (Map)response.getHits().get(0).get("_source"); + RecordSchema toUse = recordSchema != null ? recordSchema : convertSchema(source); + return new MapRecord(toUse, source); + } + + } catch (IOException e) { + throw new LookupFailureException(e); + } + } + + private RecordSchema convertSchema(Map<String, Object> result) { --- End diff -- I agree. I just wrote up [a Jira ticket](https://issues.apache.org/jira/browse/NIFI-5127) for this. Let's table it for now because we need to think about things like date strings. > Create a LookupService that uses ElasticSearch > ---------------------------------------------- > > Key: NIFI-5051 > URL: https://issues.apache.org/jira/browse/NIFI-5051 > Project: Apache NiFi > Issue Type: New Feature > Reporter: Mike Thomsen > Assignee: Mike Thomsen > Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)