Change Json parsing from gson to jackson for ElasticsearchIO
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d5257658 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d5257658 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d5257658 Branch: refs/heads/release-0.6.0 Commit: d5257658f094fe8c2a8668027bbdd4a26396ba0b Parents: 8ab36fa Author: Ismaël MejÃa <ieme...@gmail.com> Authored: Mon Mar 6 09:13:31 2017 +0100 Committer: Ahmet Altay <al...@google.com> Committed: Wed Mar 8 13:38:39 2017 -0800 ---------------------------------------------------------------------- sdks/java/io/elasticsearch/pom.xml | 7 +- .../sdk/io/elasticsearch/ElasticsearchIO.java | 102 +++++++++---------- 2 files changed, 52 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d5257658/sdks/java/io/elasticsearch/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/elasticsearch/pom.xml b/sdks/java/io/elasticsearch/pom.xml index 3279dfd..5ea4452 100644 --- a/sdks/java/io/elasticsearch/pom.xml +++ b/sdks/java/io/elasticsearch/pom.xml @@ -47,9 +47,8 @@ </dependency> <dependency> - <groupId>com.google.code.gson</groupId> - <artifactId>gson</artifactId> - <version>2.6.2</version> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> </dependency> <dependency> @@ -116,7 +115,7 @@ </dependency> <dependency> - <groupId>org.apache.commons</groupId> + <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>1.3.2</version> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/beam/blob/d5257658/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index 5073834..b08cb24 100644 --- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -20,17 +20,13 @@ package org.apache.beam.sdk.io.elasticsearch; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; -import com.google.gson.Gson; -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; import java.io.Serializable; import java.net.MalformedURLException; import java.net.URL; @@ -38,11 +34,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.ListIterator; import java.util.Map; import java.util.NoSuchElementException; -import java.util.Set; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; @@ -140,11 +136,10 @@ public class ElasticsearchIO { private ElasticsearchIO() {} - private static JsonObject parseResponse(Response response) throws IOException { - InputStream content = response.getEntity().getContent(); - InputStreamReader inputStreamReader = new InputStreamReader(content, "UTF-8"); - JsonObject jsonObject = new Gson().fromJson(inputStreamReader, JsonObject.class); - return jsonObject; + private static final ObjectMapper mapper = new ObjectMapper(); + + private static JsonNode parseResponse(Response response) throws IOException { + return mapper.readValue(response.getEntity().getContent(), JsonNode.class); } /** A POJO describing a connection configuration to Elasticsearch. */ @@ -428,23 +423,24 @@ public class ElasticsearchIO { // But, as each shard (replica or primary) is responsible for only one part of the data, // there will be no duplicate. - JsonObject statsJson = getStats(true); - JsonObject shardsJson = + JsonNode statsJson = getStats(true); + JsonNode shardsJson = statsJson - .getAsJsonObject("indices") - .getAsJsonObject(spec.getConnectionConfiguration().getIndex()) - .getAsJsonObject("shards"); - Set<Map.Entry<String, JsonElement>> shards = shardsJson.entrySet(); - for (Map.Entry<String, JsonElement> shardJson : shards) { + .path("indices") + .path(spec.getConnectionConfiguration().getIndex()) + .path("shards"); + + Iterator<Map.Entry<String, JsonNode>> shards = shardsJson.fields(); + while (shards.hasNext()) { + Map.Entry<String, JsonNode> shardJson = shards.next(); String shardId = shardJson.getKey(); - JsonArray value = (JsonArray) shardJson.getValue(); + JsonNode value = (JsonNode) shardJson.getValue(); boolean isPrimaryShard = value - .get(0) - .getAsJsonObject() - .getAsJsonObject("routing") - .getAsJsonPrimitive("primary") - .getAsBoolean(); + .path(0) + .path("routing") + .path("primary") + .asBoolean(); if (isPrimaryShard) { sources.add(new BoundedElasticsearchSource(spec, shardId)); } @@ -463,14 +459,14 @@ public class ElasticsearchIO { // NB: Elasticsearch 5.x now provides the slice API. // (https://www.elastic.co/guide/en/elasticsearch/reference/5.0/search-request-scroll.html // #sliced-scroll) - JsonObject statsJson = getStats(false); - JsonObject indexStats = + JsonNode statsJson = getStats(false); + JsonNode indexStats = statsJson - .getAsJsonObject("indices") - .getAsJsonObject(spec.getConnectionConfiguration().getIndex()) - .getAsJsonObject("primaries"); - JsonObject store = indexStats.getAsJsonObject("store"); - return store.getAsJsonPrimitive("size_in_bytes").getAsLong(); + .path("indices") + .path(spec.getConnectionConfiguration().getIndex()) + .path("primaries"); + JsonNode store = indexStats.path("store"); + return store.path("size_in_bytes").asLong(); } @Override @@ -494,7 +490,7 @@ public class ElasticsearchIO { return StringUtf8Coder.of(); } - private JsonObject getStats(boolean shardLevel) throws IOException { + private JsonNode getStats(boolean shardLevel) throws IOException { HashMap<String, String> params = new HashMap<>(); if (shardLevel) { params.put("level", "shards"); @@ -544,13 +540,13 @@ public class ElasticsearchIO { HttpEntity queryEntity = new NStringEntity(query, ContentType.APPLICATION_JSON); response = restClient.performRequest("GET", endPoint, params, queryEntity, new BasicHeader("", "")); - JsonObject searchResult = parseResponse(response); + JsonNode searchResult = parseResponse(response); updateScrollId(searchResult); return readNextBatchAndReturnFirstDocument(searchResult); } - private void updateScrollId(JsonObject searchResult) { - scrollId = searchResult.getAsJsonPrimitive("_scroll_id").getAsString(); + private void updateScrollId(JsonNode searchResult) { + scrollId = searchResult.path("_scroll_id").asText(); } @Override @@ -571,15 +567,15 @@ public class ElasticsearchIO { Collections.<String, String>emptyMap(), scrollEntity, new BasicHeader("", "")); - JsonObject searchResult = parseResponse(response); + JsonNode searchResult = parseResponse(response); updateScrollId(searchResult); return readNextBatchAndReturnFirstDocument(searchResult); } } - private boolean readNextBatchAndReturnFirstDocument(JsonObject searchResult) { + private boolean readNextBatchAndReturnFirstDocument(JsonNode searchResult) { //stop if no more data - JsonArray hits = searchResult.getAsJsonObject("hits").getAsJsonArray("hits"); + JsonNode hits = searchResult.path("hits").path("hits"); if (hits.size() == 0) { current = null; batchIterator = null; @@ -587,8 +583,8 @@ public class ElasticsearchIO { } // list behind iterator is empty List<String> batch = new ArrayList<>(); - for (JsonElement hit : hits) { - String document = hit.getAsJsonObject().getAsJsonObject("_source").toString(); + for (JsonNode hit : hits) { + String document = hit.path("_source").toString(); batch.add(document); } batchIterator = batch.listIterator(); @@ -780,26 +776,26 @@ public class ElasticsearchIO { Collections.<String, String>emptyMap(), requestBody, new BasicHeader("", "")); - JsonObject searchResult = parseResponse(response); - boolean errors = searchResult.getAsJsonPrimitive("errors").getAsBoolean(); + JsonNode searchResult = parseResponse(response); + boolean errors = searchResult.path("errors").asBoolean(); if (errors) { StringBuilder errorMessages = new StringBuilder( "Error writing to Elasticsearch, some elements could not be inserted:"); - JsonArray items = searchResult.getAsJsonArray("items"); + JsonNode items = searchResult.path("items"); //some items present in bulk might have errors, concatenate error messages - for (JsonElement item : items) { - JsonObject creationObject = item.getAsJsonObject().getAsJsonObject("create"); - JsonObject error = creationObject.getAsJsonObject("error"); + for (JsonNode item : items) { + JsonNode creationObject = item.path("create"); + JsonNode error = creationObject.get("error"); if (error != null) { - String type = error.getAsJsonPrimitive("type").getAsString(); - String reason = error.getAsJsonPrimitive("reason").getAsString(); - String docId = creationObject.getAsJsonPrimitive("_id").getAsString(); + String type = error.path("type").asText(); + String reason = error.path("reason").asText(); + String docId = creationObject.path("_id").asText(); errorMessages.append(String.format("%nDocument id %s: %s (%s)", docId, reason, type)); - JsonObject causedBy = error.getAsJsonObject("caused_by"); + JsonNode causedBy = error.get("caused_by"); if (causedBy != null) { - String cbReason = causedBy.getAsJsonPrimitive("reason").getAsString(); - String cbType = causedBy.getAsJsonPrimitive("type").getAsString(); + String cbReason = causedBy.path("reason").asText(); + String cbType = causedBy.path("type").asText(); errorMessages.append(String.format("%nCaused by: %s (%s)", cbReason, cbType)); } }