Repository: incubator-zeppelin Updated Branches: refs/heads/master 54f4420f9 -> 105538caa
[ZEPPELIN-817] Fix issue for queries containing a 'fields' parameter ### What is this PR for? It is a bug fix for search queries that use a fields parameter (cf. https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-fields.html). An error was displayed. ### What type of PR is it? Bug Fix ### Todos * [X ] - ElasticsearchInterpreter : check if the result is in the '_source' part or in the 'fields' part of the response * [X]Â - Add a unit test for this case ### What is the Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-817 ### How should this be tested? Use the elasticsearch interpreter and type a search query such as : search / {"fields":["date", "request.headers"],"query":{"match": {"status":404} Before the fix, there was an error : "Error : string is null" ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? NO * Is there breaking changes for older versions? NO * Does this needs documentation? NO Author: Bruno Bonnin <[email protected]> Closes #845 from bbonnin/master and squashes the following commits: 4165517 [Bruno Bonnin] Update json-flatten version badb53c [Bruno Bonnin] Update doc for impact of fields param c78688a [Bruno Bonnin] Fix issue when a query contains 'fields' field Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/105538ca Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/105538ca Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/105538ca Branch: refs/heads/master Commit: 105538caa93cc932844a717bbe6974d37f49f619 Parents: 54f4420 Author: Bruno Bonnin <[email protected]> Authored: Thu Apr 21 09:39:11 2016 +0200 Committer: Felix Cheung <[email protected]> Committed: Fri Apr 22 19:53:47 2016 -0700 ---------------------------------------------------------------------- .../elasticsearch-query-with-fields-param.png | Bin 0 -> 179818 bytes docs/interpreter/elasticsearch.md | 3 + elasticsearch/pom.xml | 2 +- .../elasticsearch/ElasticsearchInterpreter.java | 101 +++++++++++++------ .../ElasticsearchInterpreterTest.java | 63 ++++++------ zeppelin-distribution/src/bin_license/LICENSE | 2 +- 6 files changed, 108 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/105538ca/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-query-with-fields-param.png ---------------------------------------------------------------------- diff --git a/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-query-with-fields-param.png b/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-query-with-fields-param.png new file mode 100644 index 0000000..bf62409 Binary files /dev/null and b/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-query-with-fields-param.png differ http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/105538ca/docs/interpreter/elasticsearch.md ---------------------------------------------------------------------- diff --git a/docs/interpreter/elasticsearch.md b/docs/interpreter/elasticsearch.md index 7b70528..aa710d6 100644 --- a/docs/interpreter/elasticsearch.md +++ b/docs/interpreter/elasticsearch.md @@ -178,6 +178,9 @@ Examples: * With a JSON query:  +* With a JSON query containing a `fields` parameter (for filtering the fields in the response): in this case, all the fields values in the response are arrays, so, after flattening the result, the format of all the field names is `field_name[x]` + + * With a query string:  http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/105538ca/elasticsearch/pom.xml ---------------------------------------------------------------------- diff --git a/elasticsearch/pom.xml b/elasticsearch/pom.xml index 3da4441..016d188 100644 --- a/elasticsearch/pom.xml +++ b/elasticsearch/pom.xml @@ -36,7 +36,7 @@ <properties> <elasticsearch.version>2.1.0</elasticsearch.version> <guava.version>18.0</guava.version> - <json-flattener.version>0.1.1</json-flattener.version> + <json-flattener.version>0.1.6</json-flattener.version> </properties> <dependencies> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/105538ca/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java index 50cf739..b05139f 100644 --- a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java +++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java @@ -17,10 +17,21 @@ package org.apache.zeppelin.elasticsearch; -import com.github.wnameless.json.flattener.JsonFlattener; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonParseException; +import java.io.IOException; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.TreeSet; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + import org.apache.commons.lang.StringUtils; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -39,6 +50,7 @@ import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHitField; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; @@ -48,9 +60,10 @@ import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.InetAddress; -import java.util.*; +import com.github.wnameless.json.flattener.JsonFlattener; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonParseException; /** @@ -80,7 +93,9 @@ public class ElasticsearchInterpreter extends Interpreter { private static final List<String> COMMANDS = Arrays.asList( "count", "delete", "get", "help", "index", "search"); - + + private static final Pattern FIELD_NAME_PATTERN = Pattern.compile("\\[\\\\\"(.+)\\\\\"\\](.*)"); + public static final String ELASTICSEARCH_HOST = "elasticsearch.host"; public static final String ELASTICSEARCH_PORT = "elasticsearch.port"; @@ -141,7 +156,7 @@ public class ElasticsearchInterpreter extends Interpreter { @Override public InterpreterResult interpret(String cmd, InterpreterContext interpreterContext) { logger.info("Run Elasticsearch command '" + cmd + "'"); - + if (StringUtils.isEmpty(cmd) || StringUtils.isEmpty(cmd.trim())) { return new InterpreterResult(InterpreterResult.Code.SUCCESS); } @@ -260,15 +275,15 @@ public class ElasticsearchInterpreter extends Interpreter { /** * Processes a "get" request. - * + * * @param urlItems Items of the URL * @return Result of the get request, it contains a JSON-formatted string */ private InterpreterResult processGet(String[] urlItems) { - if (urlItems.length != 3 - || StringUtils.isEmpty(urlItems[0]) - || StringUtils.isEmpty(urlItems[1]) + if (urlItems.length != 3 + || StringUtils.isEmpty(urlItems[0]) + || StringUtils.isEmpty(urlItems[1]) || StringUtils.isEmpty(urlItems[2])) { return new InterpreterResult(InterpreterResult.Code.ERROR, "Bad URL (it should be /index/type/id)"); @@ -285,13 +300,13 @@ public class ElasticsearchInterpreter extends Interpreter { InterpreterResult.Type.TEXT, json); } - + return new InterpreterResult(InterpreterResult.Code.ERROR, "Document not found"); } /** * Processes a "count" request. - * + * * @param urlItems Items of the URL * @param data May contains the JSON of the request * @return Result of the count request, it contains the total hits @@ -313,7 +328,7 @@ public class ElasticsearchInterpreter extends Interpreter { /** * Processes a "search" request. - * + * * @param urlItems Items of the URL * @param data May contains the JSON of the request * @param size Limit of result set @@ -325,7 +340,7 @@ public class ElasticsearchInterpreter extends Interpreter { return new InterpreterResult(InterpreterResult.Code.ERROR, "Bad URL (it should be /index1,index2,.../type1,type2,...)"); } - + final SearchResponse response = searchData(urlItems, data, size); return buildResponseMessage(response); @@ -333,18 +348,18 @@ public class ElasticsearchInterpreter extends Interpreter { /** * Processes a "index" request. - * + * * @param urlItems Items of the URL * @param data JSON to be indexed * @return Result of the index request, it contains the id of the document */ private InterpreterResult processIndex(String[] urlItems, String data) { - + if (urlItems.length < 2 || urlItems.length > 3) { return new InterpreterResult(InterpreterResult.Code.ERROR, "Bad URL (it should be /index/type or /index/type/id)"); } - + final IndexResponse response = client .prepareIndex(urlItems[0], urlItems[1], urlItems.length == 2 ? null : urlItems[2]) .setSource(data) @@ -358,15 +373,15 @@ public class ElasticsearchInterpreter extends Interpreter { /** * Processes a "delete" request. - * + * * @param urlItems Items of the URL * @return Result of the delete request, it contains the id of the deleted document */ private InterpreterResult processDelete(String[] urlItems) { - if (urlItems.length != 3 - || StringUtils.isEmpty(urlItems[0]) - || StringUtils.isEmpty(urlItems[1]) + if (urlItems.length != 3 + || StringUtils.isEmpty(urlItems[0]) + || StringUtils.isEmpty(urlItems[1]) || StringUtils.isEmpty(urlItems[2])) { return new InterpreterResult(InterpreterResult.Code.ERROR, "Bad URL (it should be /index/type/id)"); @@ -375,23 +390,23 @@ public class ElasticsearchInterpreter extends Interpreter { final DeleteResponse response = client .prepareDelete(urlItems[0], urlItems[1], urlItems[2]) .get(); - + if (response.isFound()) { return new InterpreterResult( InterpreterResult.Code.SUCCESS, InterpreterResult.Type.TEXT, response.getId()); } - + return new InterpreterResult(InterpreterResult.Code.ERROR, "Document not found"); } - + private SearchResponse searchData(String[] urlItems, String query, int size) { final SearchRequestBuilder reqBuilder = new SearchRequestBuilder( client, SearchAction.INSTANCE); reqBuilder.setIndices(); - + if (urlItems.length >= 1) { reqBuilder.setIndices(StringUtils.split(urlItems[0], ",")); } @@ -452,18 +467,42 @@ public class ElasticsearchInterpreter extends Interpreter { } private String buildSearchHitsResponseMessage(SearchHit[] hits) { - + if (hits == null || hits.length == 0) { return ""; } //First : get all the keys in order to build an ordered list of the values for each hit // + final Map<String, Object> hitFields = new HashMap<>(); final List<Map<String, Object>> flattenHits = new LinkedList<>(); final Set<String> keys = new TreeSet<>(); for (SearchHit hit : hits) { - final String json = hit.getSourceAsString(); - final Map<String, Object> flattenMap = JsonFlattener.flattenAsMap(json); + // Fields can be found either in _source, or in fields (it depends on the query) + // + String json = hit.getSourceAsString(); + if (json == null) { + hitFields.clear(); + for (SearchHitField hitField : hit.getFields().values()) { + hitFields.put(hitField.getName(), hitField.getValues()); + } + json = gson.toJson(hitFields); + } + + final Map<String, Object> flattenJsonMap = JsonFlattener.flattenAsMap(json); + final Map<String, Object> flattenMap = new HashMap<>(); + for (Iterator<String> iter = flattenJsonMap.keySet().iterator(); iter.hasNext(); ) { + // Replace keys that match a format like that : [\"keyname\"][0] + final String fieldName = iter.next(); + final Matcher fieldNameMatcher = FIELD_NAME_PATTERN.matcher(fieldName); + if (fieldNameMatcher.matches()) { + flattenMap.put(fieldNameMatcher.group(1) + fieldNameMatcher.group(2), + flattenJsonMap.get(fieldName)); + } + else { + flattenMap.put(fieldName, flattenJsonMap.get(fieldName)); + } + } flattenHits.add(flattenMap); for (String key : flattenMap.keySet()) { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/105538ca/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java b/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java index 42f08ad..35f683f 100644 --- a/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java +++ b/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java @@ -17,6 +17,15 @@ package org.apache.zeppelin.elasticsearch; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Date; +import java.util.Properties; +import java.util.UUID; + import org.apache.commons.lang.math.RandomUtils; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; @@ -29,21 +38,12 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import java.io.IOException; -import java.util.Arrays; -import java.util.Date; -import java.util.Properties; -import java.util.UUID; - -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.junit.Assert.assertEquals; - public class ElasticsearchInterpreterTest { - + private static Client elsClient; private static Node elsNode; private static ElasticsearchInterpreter interpreter; - + private static final String[] METHODS = { "GET", "PUT", "DELETE", "POST" }; private static final int[] STATUS = { 200, 404, 500, 403 }; @@ -75,7 +75,7 @@ public class ElasticsearchInterpreterTest { .field("type", "integer") .endObject() .endObject().endObject().endObject()).get(); - + for (int i = 0; i < 50; i++) { elsClient.prepareIndex("logs", "http", "" + i) .setRefresh(true) @@ -100,7 +100,7 @@ public class ElasticsearchInterpreterTest { interpreter = new ElasticsearchInterpreter(props); interpreter.open(); } - + @AfterClass public static void clean() { if (interpreter != null) { @@ -116,41 +116,44 @@ public class ElasticsearchInterpreterTest { elsNode.close(); } } - + @Test public void testCount() { - + InterpreterResult res = interpreter.interpret("count /unknown", null); assertEquals(Code.ERROR, res.code()); - + res = interpreter.interpret("count /logs", null); assertEquals("50", res.message()); } - + @Test public void testGet() { - + InterpreterResult res = interpreter.interpret("get /logs/http/unknown", null); assertEquals(Code.ERROR, res.code()); - + res = interpreter.interpret("get /logs/http/10", null); assertEquals(Code.SUCCESS, res.code()); } - + @Test public void testSearch() { - + InterpreterResult res = interpreter.interpret("size 10\nsearch /logs *", null); assertEquals(Code.SUCCESS, res.code()); - + res = interpreter.interpret("search /logs {{{hello}}}", null); assertEquals(Code.ERROR, res.code()); - + res = interpreter.interpret("search /logs { \"query\": { \"match\": { \"status\": 500 } } }", null); assertEquals(Code.SUCCESS, res.code()); res = interpreter.interpret("search /logs status:404", null); - assertEquals(Code.SUCCESS, res.code()); + assertEquals(Code.SUCCESS, res.code()); + + res = interpreter.interpret("search /logs { \"fields\": [ \"date\", \"request.headers\" ], \"query\": { \"match\": { \"status\": 500 } } }", null); + assertEquals(Code.SUCCESS, res.code()); } @Test @@ -177,23 +180,23 @@ public class ElasticsearchInterpreterTest { " { \"terms\" : { \"field\" : \"status\" } } } }", null); assertEquals(Code.SUCCESS, res.code()); } - + @Test public void testIndex() { - + InterpreterResult res = interpreter.interpret("index /logs { \"date\": \"" + new Date() + "\", \"method\": \"PUT\", \"status\": \"500\" }", null); assertEquals(Code.ERROR, res.code()); - + res = interpreter.interpret("index /logs/http { \"date\": \"2015-12-06T14:54:23.368Z\", \"method\": \"PUT\", \"status\": \"500\" }", null); assertEquals(Code.SUCCESS, res.code()); } - + @Test public void testDelete() { - + InterpreterResult res = interpreter.interpret("delete /logs/http/unknown", null); assertEquals(Code.ERROR, res.code()); - + res = interpreter.interpret("delete /logs/http/11", null); assertEquals("11", res.message()); } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/105538ca/zeppelin-distribution/src/bin_license/LICENSE ---------------------------------------------------------------------- diff --git a/zeppelin-distribution/src/bin_license/LICENSE b/zeppelin-distribution/src/bin_license/LICENSE index 90796a1..09f98ad 100644 --- a/zeppelin-distribution/src/bin_license/LICENSE +++ b/zeppelin-distribution/src/bin_license/LICENSE @@ -73,7 +73,7 @@ The following components are provided under Apache License. (Apache 2.0) Jackson-dataformat-CBOR (com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.6.2 - http://wiki.fasterxml.com/JacksonForCbor) (Apache 2.0) Jackson-dataformat-Smile (com.fasterxml.jackson.dataformat:jackson-dataformat-smile:2.6.2 - http://wiki.fasterxml.com/JacksonForSmile) (Apache 2.0) Jackson-dataformat-YAML (com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.6.2 - https://github.com/FasterXML/jackson) - (Apache 2.0) json-flattener (com.github.wnameless:json-flattener:0.1.1 - https://github.com/wnameless/json-flattener) + (Apache 2.0) json-flattener (com.github.wnameless:json-flattener:0.1.6 - https://github.com/wnameless/json-flattener) (Apache 2.0) Spatial4J (com.spatial4j:spatial4j:0.4.1 - https://github.com/spatial4j/spatial4j) (Apache 2.0) T-Digest (com.tdunning:t-digest:3.0 - https://github.com/tdunning/t-digest) (Apache 2.0) Netty (io.netty:netty:3.10.5.Final - http://netty.io/)
