Github user nickwallen commented on a diff in the pull request:
https://github.com/apache/metron/pull/832#discussion_r149197184
--- Diff:
metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchSubmitter.java
---
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.dao;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
+import org.apache.metron.indexing.dao.search.InvalidSearchException;
+import org.elasticsearch.action.search.SearchPhaseExecutionException;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.ShardSearchFailure;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.rest.RestStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+
+/**
+ * Responsible for submitting searches to Elasticsearch.
+ */
+public class ElasticsearchSearchSubmitter {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ /**
+ * The Elasticsearch client.
+ */
+ private TransportClient client;
+
+ public ElasticsearchSearchSubmitter(TransportClient client) {
+ this.client = client;
+ }
+
+ /**
+ * Submit a search to Elasticsearch.
+ * @param request A search request.
+ * @return The search response.
+ */
+ public SearchResponse submitSearch(SearchRequest request) throws
InvalidSearchException {
+ LOG.debug("About to submit a search; request={}",
ElasticsearchUtils.toJSON(request));
+
+ // submit the search request
+ org.elasticsearch.action.search.SearchResponse esResponse;
+ try {
+ esResponse = client
+ .search(request)
+ .actionGet();
+ LOG.debug("Got Elasticsearch response; response={}",
esResponse.toString());
+
+ } catch (SearchPhaseExecutionException e) {
+ String msg = String.format(
+ "Failed to execute search; error='%s', search='%s'",
+ ExceptionUtils.getRootCauseMessage(e),
+ ElasticsearchUtils.toJSON(request));
+ LOG.error(msg, e);
+ throw new InvalidSearchException(msg, e);
+ }
+
+ // check for shard failures
+ if(esResponse.getFailedShards() > 0) {
+ handleShardFailures(request, esResponse);
+ }
+
+ // validate the response status
+ if(RestStatus.OK == esResponse.status()) {
+ return esResponse;
+
+ } else {
+ // the search was not successful
+ String msg = String.format(
+ "Bad search response; status=%s, timeout=%s,
terminatedEarly=%s",
+ esResponse.status(), esResponse.isTimedOut(),
esResponse.isTerminatedEarly());
+ LOG.error(msg);
+ throw new InvalidSearchException(msg);
+ }
+ }
+
+ /**
+ * Handle individual shard failures that can occur even when the
response is OK. These
+ * can indicate misconfiguration of the search indices.
+ * @param request The search request.
+ * @param response The search response.
+ */
+ private void handleShardFailures(
+ org.elasticsearch.action.search.SearchRequest request,
+ org.elasticsearch.action.search.SearchResponse response) {
+ /*
+ * shard failures are only logged. the search itself is not failed.
this approach
+ * assumes that a user is interested in partial search results, even
if the
+ * entire search result set cannot be produced.
+ *
+ * for example, assume the user adds an additional sensor and the
telemetry
+ * is indexed into a new search index. if that search index is
misconfigured,
+ * it can result in partial shard failures. rather than failing the
entire search,
+ * we log the error and allow the results to be returned from shards
that
+ * are correctly configured.
--- End diff --
Hopefully this explanation makes sense. When there are partial shard
failures, we do not fail fast. We log the problem and return partial results
to the user. I am torn as to whether this is the right approach.
---