Github user nickwallen commented on a diff in the pull request: https://github.com/apache/metron/pull/832#discussion_r149197184 --- Diff: metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchSubmitter.java --- @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.elasticsearch.dao; + +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; +import org.apache.metron.indexing.dao.search.InvalidSearchException; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.rest.RestStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; + +/** + * Responsible for submitting searches to Elasticsearch. + */ +public class ElasticsearchSearchSubmitter { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + /** + * The Elasticsearch client. + */ + private TransportClient client; + + public ElasticsearchSearchSubmitter(TransportClient client) { + this.client = client; + } + + /** + * Submit a search to Elasticsearch. + * @param request A search request. + * @return The search response. + */ + public SearchResponse submitSearch(SearchRequest request) throws InvalidSearchException { + LOG.debug("About to submit a search; request={}", ElasticsearchUtils.toJSON(request)); + + // submit the search request + org.elasticsearch.action.search.SearchResponse esResponse; + try { + esResponse = client + .search(request) + .actionGet(); + LOG.debug("Got Elasticsearch response; response={}", esResponse.toString()); + + } catch (SearchPhaseExecutionException e) { + String msg = String.format( + "Failed to execute search; error='%s', search='%s'", + ExceptionUtils.getRootCauseMessage(e), + ElasticsearchUtils.toJSON(request)); + LOG.error(msg, e); + throw new InvalidSearchException(msg, e); + } + + // check for shard failures + if(esResponse.getFailedShards() > 0) { + handleShardFailures(request, esResponse); + } + + // validate the response status + if(RestStatus.OK == esResponse.status()) { + return esResponse; + + } else { + // the search was not successful + String msg = String.format( + "Bad search response; status=%s, timeout=%s, terminatedEarly=%s", + esResponse.status(), esResponse.isTimedOut(), esResponse.isTerminatedEarly()); + LOG.error(msg); + throw new InvalidSearchException(msg); + } + } + + /** + * Handle individual shard failures that can occur even when the response is OK. These + * can indicate misconfiguration of the search indices. + * @param request The search request. + * @param response The search response. + */ + private void handleShardFailures( + org.elasticsearch.action.search.SearchRequest request, + org.elasticsearch.action.search.SearchResponse response) { + /* + * shard failures are only logged. the search itself is not failed. this approach + * assumes that a user is interested in partial search results, even if the + * entire search result set cannot be produced. + * + * for example, assume the user adds an additional sensor and the telemetry + * is indexed into a new search index. if that search index is misconfigured, + * it can result in partial shard failures. rather than failing the entire search, + * we log the error and allow the results to be returned from shards that + * are correctly configured. --- End diff -- Hopefully this explanation makes sense. When there are partial shard failures, we do not fail fast. We log the problem and return partial results to the user. I am torn as to whether this is the right approach.
---