Github user justinleet commented on a diff in the pull request:
https://github.com/apache/metron/pull/911#discussion_r164840108
--- Diff:
metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrSearchDao.java
---
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.solr.dao;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.indexing.dao.AccessConfig;
+import org.apache.metron.indexing.dao.search.GetRequest;
+import org.apache.metron.indexing.dao.search.Group;
+import org.apache.metron.indexing.dao.search.GroupOrder;
+import org.apache.metron.indexing.dao.search.GroupOrderType;
+import org.apache.metron.indexing.dao.search.GroupRequest;
+import org.apache.metron.indexing.dao.search.GroupResponse;
+import org.apache.metron.indexing.dao.search.GroupResult;
+import org.apache.metron.indexing.dao.search.InvalidSearchException;
+import org.apache.metron.indexing.dao.search.SearchDao;
+import org.apache.metron.indexing.dao.search.SearchRequest;
+import org.apache.metron.indexing.dao.search.SearchResponse;
+import org.apache.metron.indexing.dao.search.SearchResult;
+import org.apache.metron.indexing.dao.search.SortField;
+import org.apache.metron.indexing.dao.search.SortOrder;
+import org.apache.metron.indexing.dao.update.Document;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrQuery.ORDER;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.response.FacetField;
+import org.apache.solr.client.solrj.response.FacetField.Count;
+import org.apache.solr.client.solrj.response.PivotField;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrDocumentList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SolrSearchDao implements SearchDao {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private transient SolrClient client;
+ private AccessConfig accessConfig;
+
+ public SolrSearchDao(SolrClient client, AccessConfig accessConfig) {
+ this.client = client;
+ this.accessConfig = accessConfig;
+ }
+
+ @Override
+ public SearchResponse search(SearchRequest searchRequest) throws
InvalidSearchException {
+ if (searchRequest.getQuery() == null) {
+ throw new InvalidSearchException("Search query is invalid: null");
+ }
+ if (client == null) {
+ throw new InvalidSearchException("Uninitialized Dao! You must call
init() prior to use.");
+ }
+ if (searchRequest.getSize() > accessConfig.getMaxSearchResults()) {
+ throw new InvalidSearchException(
+ "Search result size must be less than " +
accessConfig.getMaxSearchResults());
+ }
+ SolrQuery query = buildSearchRequest(searchRequest);
+ try {
+ QueryResponse response = client.query(query);
+ return buildSearchResponse(searchRequest, response);
+ } catch (IOException | SolrServerException e) {
+ String msg = e.getMessage();
+ LOG.error(msg, e);
+ throw new InvalidSearchException(msg, e);
+ }
+ }
+
+ @Override
+ public GroupResponse group(GroupRequest groupRequest) throws
InvalidSearchException {
+ String groupNames =
groupRequest.getGroups().stream().map(Group::getField).collect(
+ Collectors.joining(","));
+ SolrQuery query = new SolrQuery()
+ .setStart(0)
+ .setRows(0)
+ .setQuery("*:*");
+ query.set("collection", "bro,snort");
+ Optional<String> scoreField = groupRequest.getScoreField();
+ if (scoreField.isPresent()) {
+ query.set("stats", true);
+ query.set("stats.field", String.format("{!tag=piv1 sum=true}%s",
scoreField.get()));
+ }
+ query.set("facet", true);
+ query.set("facet.pivot", String.format("{!stats=piv1}%s", groupNames));
+ try {
+ QueryResponse response = client.query(query);
+ return buildGroupResponse(groupRequest, response);
+ } catch (IOException | SolrServerException e) {
+ String msg = e.getMessage();
+ LOG.error(msg, e);
+ throw new InvalidSearchException(msg, e);
+ }
+ }
+
+ @Override
+ public Document getLatest(String guid, String collection) throws
IOException {
+ try {
+ SolrDocument solrDocument = client.getById(collection, guid);
+ return toDocument(solrDocument);
+ } catch (SolrServerException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public Iterable<Document> getAllLatest(List<GetRequest> getRequests)
throws IOException {
+ Map<String, Collection<String>> collectionIdMap = new HashMap<>();
+ for (GetRequest getRequest: getRequests) {
+ Collection<String> ids =
collectionIdMap.get(getRequest.getSensorType());
+ if (ids == null) {
+ ids = new HashSet<>();
+ }
+ ids.add(getRequest.getGuid());
+ collectionIdMap.put(getRequest.getSensorType(), ids);
+ }
+ try {
+ List<Document> documents = new ArrayList<>();
+ for (String collection: collectionIdMap.keySet()) {
+ SolrDocumentList solrDocumentList =
client.getById(collectionIdMap.get(collection),
+ new SolrQuery().set("collection", collection));
+
documents.addAll(solrDocumentList.stream().map(this::toDocument).collect(Collectors.toList()));
+ }
+ return documents;
+ } catch (SolrServerException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private SolrQuery buildSearchRequest(
+ SearchRequest searchRequest) throws InvalidSearchException {
+ SolrQuery query = new SolrQuery()
+ .setStart(searchRequest.getFrom())
+ .setRows(searchRequest.getSize())
+ .setQuery(searchRequest.getQuery());
+
+ // handle sort fields
+ for (SortField sortField : searchRequest.getSort()) {
+ query.addSort(sortField.getField(),
getSolrSortOrder(sortField.getSortOrder()));
+ }
+
+ // handle search fields
+ Optional<List<String>> fields = searchRequest.getFields();
+ if (fields.isPresent()) {
+ fields.get().forEach(query::addField);
+ }
+
+ //handle facet fields
+ Optional<List<String>> facetFields = searchRequest.getFacetFields();
+ if (facetFields.isPresent()) {
+ facetFields.get().forEach(query::addFacetField);
+ }
+
+ String collections =
searchRequest.getIndices().stream().collect(Collectors.joining(","));
+ query.set("collection", collections);
+
+ return query;
+ }
+
+ private SolrQuery.ORDER getSolrSortOrder(
+ SortOrder sortOrder) {
+ return sortOrder == SortOrder.DESC ?
+ ORDER.desc : ORDER.asc;
+ }
+
+ private SearchResponse buildSearchResponse(
+ SearchRequest searchRequest,
+ QueryResponse solrResponse) throws InvalidSearchException {
+
+ SearchResponse searchResponse = new SearchResponse();
+ SolrDocumentList solrDocumentList = solrResponse.getResults();
+ searchResponse.setTotal(solrDocumentList.getNumFound());
+
+ // search hits --> search results
+ List<SearchResult> results = solrDocumentList.stream()
+ .map(solrDocument -> getSearchResult(solrDocument,
searchRequest.getFields()))
+ .collect(Collectors.toList());
+ searchResponse.setResults(results);
+
+ // handle facet fields
+ Optional<List<String>> facetFields = searchRequest.getFacetFields();
+ if (facetFields.isPresent()) {
+ searchResponse.setFacetCounts(getFacetCounts(facetFields.get(),
solrResponse));
+ }
+
+ if (LOG.isDebugEnabled()) {
+ String response;
+ try {
+ response = JSONUtils.INSTANCE.toJSON(searchResponse, false);
+ } catch (JsonProcessingException e) {
+ response = "???";
+ }
+
+ LOG.debug("Built search response; response={}", response);
+ }
+ return searchResponse;
+ }
+
+ private SearchResult getSearchResult(SolrDocument solrDocument,
Optional<List<String>> fields) {
+ SearchResult searchResult = new SearchResult();
+ searchResult.setId((String)
solrDocument.getFieldValue(SolrDao.ID_FIELD));
+ Map<String, Object> source;
+ if (fields.isPresent()) {
+ source = new HashMap<>();
+ fields.get().forEach(field -> source.put(field,
solrDocument.getFieldValue(field)));
+ } else {
+ source = solrDocument.getFieldValueMap();
+ }
+ searchResult.setSource(source);
+ return searchResult;
+ }
+
+ private Map<String, Map<String, Long>> getFacetCounts(List<String>
fields,
+ QueryResponse solrResponse) {
+ Map<String, Map<String, Long>> fieldCounts = new HashMap<>();
+ for (String field : fields) {
+ Map<String, Long> valueCounts = new HashMap<>();
+ FacetField facetField = solrResponse.getFacetField(field);
+ for (Count facetCount : facetField.getValues()) {
+ valueCounts.put(facetCount.getName(), facetCount.getCount());
+ }
+ fieldCounts.put(field, valueCounts);
+ }
+ return fieldCounts;
+ }
+
+ /**
+ * Build a group response.
+ * @param groupRequest The original group request.
+ * @param response The search response.
+ * @return A group response.
+ * @throws InvalidSearchException
+ */
+ private GroupResponse buildGroupResponse(
--- End diff --
Again, exception actually thrown?
---