Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2615#discussion_r189727653 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java --- @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.elasticsearch; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.lookup.LookupFailureException; +import org.apache.nifi.lookup.LookupService; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.SchemaRegistryService; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.type.RecordDataType; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +public class ElasticSearchLookupService extends SchemaRegistryService implements LookupService { + public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder() + .name("el-rest-client-service") + .displayName("Client Service") + .description("An ElasticSearch client service to use for running queries.") + .identifiesControllerService(ElasticSearchClientService.class) + .required(true) + .build(); + public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder() + .name("el-lookup-index") + .displayName("Index") + .description("The name of the index to read from") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder() + .name("el-lookup-type") + .displayName("Type") + .description("The type of this document (used by Elasticsearch for indexing and searching)") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + + public static final PropertyDescriptor RECORD_SCHEMA_NAME = new PropertyDescriptor.Builder() + .name("el-lookup-record-schema-name") + .displayName("Record Schema Name") + .description("If specified, the value will be used to lookup a schema in the configured schema registry.") + .required(false) + .addValidator(Validator.VALID) + .build(); + + private ElasticSearchClientService clientService; + + private String index; + private String type; + private ObjectMapper mapper; + + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws InitializationException { + clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class); + index = context.getProperty(INDEX).getValue(); + type = context.getProperty(TYPE).getValue(); + mapper = new ObjectMapper(); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + List<PropertyDescriptor> _desc = new ArrayList<>(); + _desc.addAll(super.getSupportedPropertyDescriptors()); + _desc.add(CLIENT_SERVICE); + _desc.add(INDEX); + _desc.add(TYPE); + _desc.add(RECORD_SCHEMA_NAME); + + return Collections.unmodifiableList(_desc); + } + + @Override + public Optional lookup(Map coordinates) throws LookupFailureException { --- End diff -- For nested, it could be trickier. I think this would work: ``` /user/email => "user.contact.email" ``` ``` { "query": { "nested": { "path": "user.contact", "query": { "match": { "email": "john.sm...@company.com" } } } } } ```
---