http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/HttpBasedClient.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/HttpBasedClient.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/HttpBasedClient.java index 722f0a4..f2a9f02 100644 --- a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/HttpBasedClient.java +++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/HttpBasedClient.java @@ -21,28 +21,34 @@ import com.google.common.base.Joiner; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonParseException; + +import org.apache.commons.lang3.StringUtils; +import org.json.JSONArray; +import org.json.JSONObject; + +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; + import com.mashape.unirest.http.HttpResponse; import com.mashape.unirest.http.JsonNode; import com.mashape.unirest.http.Unirest; import com.mashape.unirest.http.exceptions.UnirestException; import com.mashape.unirest.request.HttpRequest; import com.mashape.unirest.request.HttpRequestWithBody; -import java.io.UnsupportedEncodingException; -import java.net.URLEncoder; -import java.util.Iterator; -import java.util.Map; -import java.util.Properties; -import org.apache.commons.lang3.StringUtils; + import org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter; import org.apache.zeppelin.elasticsearch.action.ActionException; import org.apache.zeppelin.elasticsearch.action.ActionResponse; import org.apache.zeppelin.elasticsearch.action.AggWrapper; import org.apache.zeppelin.elasticsearch.action.AggWrapper.AggregationType; import org.apache.zeppelin.elasticsearch.action.HitWrapper; -import org.json.JSONArray; -import org.json.JSONObject; -/** Elasticsearch client using the HTTP API. */ +/** + * Elasticsearch client using the HTTP API. + */ public class HttpBasedClient implements ElasticsearchClient { private static final String QUERY_STRING_TEMPLATE = "{ \"query\": { \"query_string\": { \"query\": \"_Q_\", \"analyze_wildcard\": \"true\" } } }"; @@ -112,11 +118,8 @@ public class HttpBasedClient implements ElasticsearchClient { } else { // There are differences: to avoid problems with some special characters // such as / and # in id, use a "terms" query - buffer - .append("/_search?source=") - .append( - URLEncoder.encode( - "{\"query\":{\"terms\":{\"_id\":[\"" + id + "\"]}}}", "UTF-8")); + buffer.append("/_search?source=").append(URLEncoder + .encode("{\"query\":{\"terms\":{\"_id\":[\"" + id + "\"]}}}", "UTF-8")); } } else { buffer.append("/").append(id); @@ -151,31 +154,28 @@ public class HttpBasedClient implements ElasticsearchClient { if (isSucceeded) { final JsonNode body = new JsonNode(result.getBody()); if (body.getObject().has("_index")) { - response = - new ActionResponse() - .succeeded(true) - .hit( - new HitWrapper( - getFieldAsString(body, "_index"), - getFieldAsString(body, "_type"), - getFieldAsString(body, "_id"), - getFieldAsString(body, "_source"))); + response = new ActionResponse() + .succeeded(true) + .hit(new HitWrapper( + getFieldAsString(body, "_index"), + getFieldAsString(body, "_type"), + getFieldAsString(body, "_id"), + getFieldAsString(body, "_source"))); } else { final JSONArray hits = getFieldAsArray(body.getObject(), "hits/hits"); final JSONObject hit = (JSONObject) hits.iterator().next(); - response = - new ActionResponse() - .succeeded(true) - .hit( - new HitWrapper( - hit.getString("_index"), - hit.getString("_type"), - hit.getString("_id"), - hit.opt("_source").toString())); + response = new ActionResponse() + .succeeded(true) + .hit(new HitWrapper( + hit.getString("_index"), + hit.getString("_type"), + hit.getString("_id"), + hit.opt("_source").toString())); } } else { if (result.getStatus() == 404) { - response = new ActionResponse().succeeded(false); + response = new ActionResponse() + .succeeded(false); } else { throw new ActionException(result.getBody()); } @@ -200,15 +200,13 @@ public class HttpBasedClient implements ElasticsearchClient { if (isSucceeded) { final JsonNode body = new JsonNode(result.getBody()); - response = - new ActionResponse() - .succeeded(true) - .hit( - new HitWrapper( - getFieldAsString(body, "_index"), - getFieldAsString(body, "_type"), - getFieldAsString(body, "_id"), - null)); + response = new ActionResponse() + .succeeded(true) + .hit(new HitWrapper( + getFieldAsString(body, "_index"), + getFieldAsString(body, "_type"), + getFieldAsString(body, "_id"), + null)); } else { throw new ActionException(result.getBody()); } @@ -231,8 +229,7 @@ public class HttpBasedClient implements ElasticsearchClient { request .header("Accept", "application/json") .header("Content-Type", "application/json") - .body(data) - .getHttpRequest(); + .body(data).getHttpRequest(); if (StringUtils.isNotEmpty(username)) { request.basicAuth(username, password); } @@ -241,15 +238,13 @@ public class HttpBasedClient implements ElasticsearchClient { final boolean isSucceeded = isSucceeded(result); if (isSucceeded) { - response = - new ActionResponse() - .succeeded(true) - .hit( - new HitWrapper( - getFieldAsString(result, "_index"), - getFieldAsString(result, "_type"), - getFieldAsString(result, "_id"), - null)); + response = new ActionResponse() + .succeeded(true) + .hit(new HitWrapper( + getFieldAsString(result, "_index"), + getFieldAsString(result, "_type"), + getFieldAsString(result, "_id"), + null)); } else { throw new ActionException(result.getBody().toString()); } @@ -275,9 +270,9 @@ public class HttpBasedClient implements ElasticsearchClient { } try { - final HttpRequestWithBody request = - Unirest.post(getUrl(indices, types) + "/_search?size=" + size) - .header("Content-Type", "application/json"); + final HttpRequestWithBody request = Unirest + .post(getUrl(indices, types) + "/_search?size=" + size) + .header("Content-Type", "application/json"); if (StringUtils.isNoneEmpty(query)) { request.header("Accept", "application/json").body(query); @@ -293,7 +288,9 @@ public class HttpBasedClient implements ElasticsearchClient { if (isSucceeded(result)) { final long total = getFieldAsLong(result, "hits/total"); - response = new ActionResponse().succeeded(true).totalHits(total); + response = new ActionResponse() + .succeeded(true) + .totalHits(total); if (containsAggs(result)) { JSONObject aggregationsMap = body.getJSONObject("aggregations"); @@ -301,7 +298,7 @@ public class HttpBasedClient implements ElasticsearchClient { aggregationsMap = body.getJSONObject("aggs"); } - for (final String key : aggregationsMap.keySet()) { + for (final String key: aggregationsMap.keySet()) { final JSONObject aggResult = aggregationsMap.getJSONObject(key); if (aggResult.has("buckets")) { // Multi-bucket aggregations @@ -322,13 +319,13 @@ public class HttpBasedClient implements ElasticsearchClient { while (iter.hasNext()) { final JSONObject hit = (JSONObject) iter.next(); - final Object data = hit.opt("_source") != null ? hit.opt("_source") : hit.opt("fields"); - response.addHit( - new HitWrapper( - hit.getString("_index"), - hit.getString("_type"), - hit.getString("_id"), - data.toString())); + final Object data = + hit.opt("_source") != null ? hit.opt("_source") : hit.opt("fields"); + response.addHit(new HitWrapper( + hit.getString("_index"), + hit.getString("_type"), + hit.getString("_id"), + data.toString())); } } } else { @@ -342,13 +339,14 @@ public class HttpBasedClient implements ElasticsearchClient { } private boolean containsAggs(HttpResponse<JsonNode> result) { - return result.getBody() != null - && (result.getBody().getObject().has("aggregations") - || result.getBody().getObject().has("aggs")); + return result.getBody() != null && + (result.getBody().getObject().has("aggregations") || + result.getBody().getObject().has("aggs")); } @Override - public void close() {} + public void close() { + } @Override public String toString() {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/TransportBasedClient.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/TransportBasedClient.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/TransportBasedClient.java index 0ea43cb..2af37bd 100644 --- a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/TransportBasedClient.java +++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/TransportBasedClient.java @@ -20,21 +20,8 @@ package org.apache.zeppelin.elasticsearch.client; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonSyntaxException; -import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; + import org.apache.commons.lang3.StringUtils; -import org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter; -import org.apache.zeppelin.elasticsearch.action.ActionResponse; -import org.apache.zeppelin.elasticsearch.action.AggWrapper; -import org.apache.zeppelin.elasticsearch.action.HitWrapper; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexResponse; @@ -58,63 +45,96 @@ import org.elasticsearch.search.aggregations.bucket.InternalSingleBucketAggregat import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation; -/** Elasticsearch client using the transport protocol. */ +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter; +import org.apache.zeppelin.elasticsearch.action.ActionResponse; +import org.apache.zeppelin.elasticsearch.action.AggWrapper; +import org.apache.zeppelin.elasticsearch.action.HitWrapper; + +/** + * Elasticsearch client using the transport protocol. + */ public class TransportBasedClient implements ElasticsearchClient { private final Gson gson = new GsonBuilder().setPrettyPrinting().create(); private final Client client; public TransportBasedClient(Properties props) throws UnknownHostException { - final String host = props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_HOST); - final int port = - Integer.parseInt(props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_PORT)); + final String host = + props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_HOST); + final int port = Integer.parseInt( + props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_PORT)); final String clusterName = props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_CLUSTER_NAME); - final Settings settings = - Settings.settingsBuilder().put("cluster.name", clusterName).put(props).build(); + final Settings settings = Settings.settingsBuilder() + .put("cluster.name", clusterName) + .put(props) + .build(); - client = - TransportClient.builder() - .settings(settings) - .build() - .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port)); + client = TransportClient.builder().settings(settings).build() + .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port)); } @Override public ActionResponse get(String index, String type, String id) { - final GetResponse getResp = client.prepareGet(index, type, id).get(); + final GetResponse getResp = client + .prepareGet(index, type, id) + .get(); return new ActionResponse() .succeeded(getResp.isExists()) - .hit( - new HitWrapper( - getResp.getIndex(), - getResp.getType(), - getResp.getId(), - getResp.getSourceAsString())); + .hit(new HitWrapper( + getResp.getIndex(), + getResp.getType(), + getResp.getId(), + getResp.getSourceAsString())); } @Override public ActionResponse delete(String index, String type, String id) { - final DeleteResponse delResp = client.prepareDelete(index, type, id).get(); + final DeleteResponse delResp = client + .prepareDelete(index, type, id) + .get(); return new ActionResponse() .succeeded(delResp.isFound()) - .hit(new HitWrapper(delResp.getIndex(), delResp.getType(), delResp.getId(), null)); + .hit(new HitWrapper( + delResp.getIndex(), + delResp.getType(), + delResp.getId(), + null)); } @Override public ActionResponse index(String index, String type, String id, String data) { - final IndexResponse idxResp = client.prepareIndex(index, type, id).setSource(data).get(); + final IndexResponse idxResp = client + .prepareIndex(index, type, id) + .setSource(data) + .get(); return new ActionResponse() .succeeded(idxResp.isCreated()) - .hit(new HitWrapper(idxResp.getIndex(), idxResp.getType(), idxResp.getId(), null)); + .hit(new HitWrapper( + idxResp.getIndex(), + idxResp.getType(), + idxResp.getId(), + null)); } @Override public ActionResponse search(String[] indices, String[] types, String query, int size) { - final SearchRequestBuilder reqBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE); + final SearchRequestBuilder reqBuilder = new SearchRequestBuilder( + client, SearchAction.INSTANCE); reqBuilder.setIndices(); if (indices != null) { @@ -141,13 +161,14 @@ public class TransportBasedClient implements ElasticsearchClient { final SearchResponse searchResp = reqBuilder.get(); - final ActionResponse actionResp = - new ActionResponse().succeeded(true).totalHits(searchResp.getHits().getTotalHits()); + final ActionResponse actionResp = new ActionResponse() + .succeeded(true) + .totalHits(searchResp.getHits().getTotalHits()); if (searchResp.getAggregations() != null) { setAggregations(searchResp.getAggregations(), actionResp); } else { - for (final SearchHit hit : searchResp.getHits()) { + for (final SearchHit hit: searchResp.getHits()) { // Fields can be found either in _source, or in fields (it depends on the query) // => specific for elasticsearch's version < 5 // @@ -172,15 +193,11 @@ public class TransportBasedClient implements ElasticsearchClient { final Aggregation agg = aggregations.asList().get(0); if (agg instanceof InternalMetricsAggregation) { - actionResp.addAggregation( - new AggWrapper( - AggWrapper.AggregationType.SIMPLE, - XContentHelper.toString((InternalMetricsAggregation) agg).toString())); + actionResp.addAggregation(new AggWrapper(AggWrapper.AggregationType.SIMPLE, + XContentHelper.toString((InternalMetricsAggregation) agg).toString())); } else if (agg instanceof InternalSingleBucketAggregation) { - actionResp.addAggregation( - new AggWrapper( - AggWrapper.AggregationType.SIMPLE, - XContentHelper.toString((InternalSingleBucketAggregation) agg).toString())); + actionResp.addAggregation(new AggWrapper(AggWrapper.AggregationType.SIMPLE, + XContentHelper.toString((InternalSingleBucketAggregation) agg).toString())); } else if (agg instanceof InternalMultiBucketAggregation) { final Set<String> headerKeys = new HashSet<>(); final List<Map<String, Object>> buckets = new LinkedList<>(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java b/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java index c6e7966..4a412aa 100644 --- a/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java +++ b/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java @@ -21,6 +21,20 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import org.apache.commons.lang.math.RandomUtils; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.node.Node; +import org.elasticsearch.node.NodeBuilder; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.experimental.theories.DataPoint; +import org.junit.experimental.theories.Theories; +import org.junit.experimental.theories.Theory; +import org.junit.runner.RunWith; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -29,25 +43,13 @@ import java.util.List; import java.util.Properties; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.lang.math.RandomUtils; + import org.apache.zeppelin.completer.CompletionType; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.node.Node; -import org.elasticsearch.node.NodeBuilder; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.experimental.theories.DataPoint; -import org.junit.experimental.theories.Theories; -import org.junit.experimental.theories.Theory; -import org.junit.runner.RunWith; @RunWith(Theories.class) public class ElasticsearchInterpreterTest { @@ -57,8 +59,8 @@ public class ElasticsearchInterpreterTest { private static Client elsClient; private static Node elsNode; - private static final String[] METHODS = {"GET", "PUT", "DELETE", "POST"}; - private static final int[] STATUS = {200, 404, 500, 403}; + private static final String[] METHODS = { "GET", "PUT", "DELETE", "POST" }; + private static final int[] STATUS = { 200, 404, 500, 403 }; private static final String ELS_CLUSTER_NAME = "zeppelin-elasticsearch-interpreter-test"; private static final String ELS_HOST = "localhost"; @@ -70,8 +72,7 @@ public class ElasticsearchInterpreterTest { @BeforeClass public static void populate() throws IOException { - final Settings settings = - Settings.settingsBuilder() + final Settings settings = Settings.settingsBuilder() .put("cluster.name", ELS_CLUSTER_NAME) .put("network.host", ELS_HOST) .put("http.port", ELS_HTTP_PORT) @@ -82,58 +83,46 @@ public class ElasticsearchInterpreterTest { elsNode = NodeBuilder.nodeBuilder().settings(settings).node(); elsClient = elsNode.client(); - elsClient - .admin() - .indices() - .prepareCreate("logs") - .addMapping( - "http", - jsonBuilder() - .startObject() - .startObject("http") - .startObject("properties") - .startObject("content_length") - .field("type", "integer") - .endObject() - .endObject() - .endObject() - .endObject()) - .get(); + elsClient.admin().indices().prepareCreate("logs") + .addMapping("http", jsonBuilder() + .startObject().startObject("http").startObject("properties") + .startObject("content_length") + .field("type", "integer") + .endObject() + .endObject().endObject().endObject()).get(); for (int i = 0; i < 48; i++) { - elsClient - .prepareIndex("logs", "http", "" + i) - .setRefresh(true) - .setSource( - jsonBuilder() - .startObject() - .field("date", new Date()) - .startObject("request") - .field("method", METHODS[RandomUtils.nextInt(METHODS.length)]) - .field("url", "/zeppelin/" + UUID.randomUUID().toString()) - .field("headers", Arrays.asList("Accept: *.*", "Host: apache.org")) - .endObject() - .field("status", STATUS[RandomUtils.nextInt(STATUS.length)]) - .field("content_length", RandomUtils.nextInt(2000))) - .get(); + elsClient.prepareIndex("logs", "http", "" + i) + .setRefresh(true) + .setSource(jsonBuilder() + .startObject() + .field("date", new Date()) + .startObject("request") + .field("method", METHODS[RandomUtils.nextInt(METHODS.length)]) + .field("url", "/zeppelin/" + UUID.randomUUID().toString()) + .field("headers", Arrays.asList("Accept: *.*", "Host: apache.org")) + .endObject() + .field("status", STATUS[RandomUtils.nextInt(STATUS.length)]) + .field("content_length", RandomUtils.nextInt(2000)) + ) + .get(); } for (int i = 1; i < 3; i++) { - elsClient - .prepareIndex("logs", "http", "very/strange/id#" + i) - .setRefresh(true) - .setSource( - jsonBuilder() - .startObject() - .field("date", new Date()) - .startObject("request") - .field("method", METHODS[RandomUtils.nextInt(METHODS.length)]) - .field("url", "/zeppelin/" + UUID.randomUUID().toString()) - .field("headers", Arrays.asList("Accept: *.*", "Host: apache.org")) - .endObject() - .field("status", STATUS[RandomUtils.nextInt(STATUS.length)]) - .field("content_length", RandomUtils.nextInt(2000))) - .get(); + elsClient.prepareIndex("logs", "http", "very/strange/id#" + i) + .setRefresh(true) + .setSource(jsonBuilder() + .startObject() + .field("date", new Date()) + .startObject("request") + .field("method", METHODS[RandomUtils.nextInt(METHODS.length)]) + .field("url", "/zeppelin/" + UUID.randomUUID().toString()) + .field("headers", Arrays.asList("Accept: *.*", "Host: apache.org")) + .endObject() + .field("status", STATUS[RandomUtils.nextInt(STATUS.length)]) + .field("content_length", RandomUtils.nextInt(2000)) + ) + .get(); } final Properties props = new Properties(); @@ -192,8 +181,8 @@ public class ElasticsearchInterpreterTest { assertNotNull(ctx.getAngularObjectRegistry().get("count_testCount", null, null)); assertEquals(50L, ctx.getAngularObjectRegistry().get("count_testCount", null, null).get()); - res = - interpreter.interpret("count /logs { \"query\": { \"match\": { \"status\": 500 } } }", ctx); + res = interpreter.interpret("count /logs { \"query\": { \"match\": { \"status\": 500 } } }", + ctx); assertEquals(Code.SUCCESS, res.code()); } @@ -230,19 +219,15 @@ public class ElasticsearchInterpreterTest { res = interpreter.interpret("search /logs {{{hello}}}", ctx); assertEquals(Code.ERROR, res.code()); - res = - interpreter.interpret( - "search /logs { \"query\": { \"match\": { \"status\": 500 } } }", ctx); + res = interpreter.interpret("search /logs { \"query\": { \"match\": { \"status\": 500 } } }", + ctx); assertEquals(Code.SUCCESS, res.code()); res = interpreter.interpret("search /logs status:404", ctx); assertEquals(Code.SUCCESS, res.code()); - res = - interpreter.interpret( - "search /logs { \"fields\": [ \"date\", \"request.headers\" ], " - + "\"query\": { \"match\": { \"status\": 500 } } }", - ctx); + res = interpreter.interpret("search /logs { \"fields\": [ \"date\", \"request.headers\" ], " + + "\"query\": { \"match\": { \"status\": 500 } } }", ctx); assertEquals(Code.SUCCESS, res.code()); } @@ -251,75 +236,50 @@ public class ElasticsearchInterpreterTest { final InterpreterContext ctx = buildContext("agg"); // Single-value metric - InterpreterResult res = - interpreter.interpret( - "search /logs { \"aggs\" : " - + "{ \"distinct_status_count\" : " - + " { \"cardinality\" : { \"field\" : \"status\" } } } }", - ctx); + InterpreterResult res = interpreter.interpret("search /logs { \"aggs\" : " + + "{ \"distinct_status_count\" : " + + " { \"cardinality\" : { \"field\" : \"status\" } } } }", ctx); assertEquals(Code.SUCCESS, res.code()); // Multi-value metric - res = - interpreter.interpret( - "search /logs { \"aggs\" : { \"content_length_stats\" : " - + " { \"extended_stats\" : { \"field\" : \"content_length\" } } } }", - ctx); + res = interpreter.interpret("search /logs { \"aggs\" : { \"content_length_stats\" : " + + " { \"extended_stats\" : { \"field\" : \"content_length\" } } } }", ctx); assertEquals(Code.SUCCESS, res.code()); // Single bucket - res = - interpreter.interpret( - "search /logs { \"aggs\" : { " - + " \"200_OK\" : { \"filter\" : { \"term\": { \"status\": \"200\" } }, " - + " \"aggs\" : { \"avg_length\" : { \"avg\" : " - + "{ \"field\" : \"content_length\" } } } } } }", - ctx); + res = interpreter.interpret("search /logs { \"aggs\" : { " + + " \"200_OK\" : { \"filter\" : { \"term\": { \"status\": \"200\" } }, " + + " \"aggs\" : { \"avg_length\" : { \"avg\" : " + + "{ \"field\" : \"content_length\" } } } } } }", ctx); assertEquals(Code.SUCCESS, res.code()); // Multi-buckets - res = - interpreter.interpret( - "search /logs { \"aggs\" : { \"status_count\" : " - + " { \"terms\" : { \"field\" : \"status\" } } } }", - ctx); + res = interpreter.interpret("search /logs { \"aggs\" : { \"status_count\" : " + + " { \"terms\" : { \"field\" : \"status\" } } } }", ctx); assertEquals(Code.SUCCESS, res.code()); - res = - interpreter.interpret( - "search /logs { \"aggs\" : { " - + " \"length\" : { \"terms\": { \"field\": \"status\" }, " - + " \"aggs\" : { \"sum_length\" : { \"sum\" : { \"field\" : \"content_length\" } }, " - + "\"sum_status\" : { \"sum\" : { \"field\" : \"status\" } } } } } }", - ctx); + res = interpreter.interpret("search /logs { \"aggs\" : { " + + " \"length\" : { \"terms\": { \"field\": \"status\" }, " + + " \"aggs\" : { \"sum_length\" : { \"sum\" : { \"field\" : \"content_length\" } }, " + + "\"sum_status\" : { \"sum\" : { \"field\" : \"status\" } } } } } }", ctx); assertEquals(Code.SUCCESS, res.code()); } @Theory public void testIndex(ElasticsearchInterpreter interpreter) { - InterpreterResult res = - interpreter.interpret( - "index /logs { \"date\": \"" - + new Date() - + "\", \"method\": \"PUT\", \"status\": \"500\" }", - null); + InterpreterResult res = interpreter.interpret("index /logs { \"date\": \"" + new Date() + + "\", \"method\": \"PUT\", \"status\": \"500\" }", null); assertEquals(Code.ERROR, res.code()); res = interpreter.interpret("index /logs/http { bad ", null); assertEquals(Code.ERROR, res.code()); - res = - interpreter.interpret( - "index /logs/http { \"date\": \"2015-12-06T14:54:23.368Z\", " - + "\"method\": \"PUT\", \"status\": \"500\" }", - null); + res = interpreter.interpret("index /logs/http { \"date\": \"2015-12-06T14:54:23.368Z\", " + + "\"method\": \"PUT\", \"status\": \"500\" }", null); assertEquals(Code.SUCCESS, res.code()); - res = - interpreter.interpret( - "index /logs/http/1000 { \"date\": " - + "\"2015-12-06T14:54:23.368Z\", \"method\": \"PUT\", \"status\": \"500\" }", - null); + res = interpreter.interpret("index /logs/http/1000 { \"date\": " + + "\"2015-12-06T14:54:23.368Z\", \"method\": \"PUT\", \"status\": \"500\" }", null); assertEquals(Code.SUCCESS, res.code()); } @@ -348,10 +308,10 @@ public class ElasticsearchInterpreterTest { @Theory public void testCompletion(ElasticsearchInterpreter interpreter) { - final List<InterpreterCompletion> expectedResultOne = - Arrays.asList(new InterpreterCompletion("count", "count", CompletionType.command.name())); - final List<InterpreterCompletion> expectedResultTwo = - Arrays.asList(new InterpreterCompletion("help", "help", CompletionType.command.name())); + final List<InterpreterCompletion> expectedResultOne = Arrays.asList( + new InterpreterCompletion("count", "count", CompletionType.command.name())); + final List<InterpreterCompletion> expectedResultTwo = Arrays.asList( + new InterpreterCompletion("help", "help", CompletionType.command.name())); final List<InterpreterCompletion> resultOne = interpreter.completion("co", 0, null); final List<InterpreterCompletion> resultTwo = interpreter.completion("he", 0, null); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/file/pom.xml ---------------------------------------------------------------------- diff --git a/file/pom.xml b/file/pom.xml index e649991..ed0ef3f 100644 --- a/file/pom.xml +++ b/file/pom.xml @@ -91,6 +91,13 @@ <plugin> <artifactId>maven-resources-plugin</artifactId> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <configuration> + <skip>false</skip> + </configuration> + </plugin> </plugins> </build> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/file/src/main/java/org/apache/zeppelin/file/FileInterpreter.java ---------------------------------------------------------------------- diff --git a/file/src/main/java/org/apache/zeppelin/file/FileInterpreter.java b/file/src/main/java/org/apache/zeppelin/file/FileInterpreter.java index fc2096f..eea5650 100644 --- a/file/src/main/java/org/apache/zeppelin/file/FileInterpreter.java +++ b/file/src/main/java/org/apache/zeppelin/file/FileInterpreter.java @@ -1,19 +1,26 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * <p>http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * <p>Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.zeppelin.file; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; @@ -21,6 +28,7 @@ import java.util.HashSet; import java.util.List; import java.util.Properties; import java.util.StringTokenizer; + import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; @@ -30,10 +38,11 @@ import org.apache.zeppelin.interpreter.InterpreterResult.Type; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.scheduler.Scheduler; import org.apache.zeppelin.scheduler.SchedulerFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -/** File interpreter for Zeppelin. */ +/** + * File interpreter for Zeppelin. + * + */ public abstract class FileInterpreter extends Interpreter { Logger logger = LoggerFactory.getLogger(FileInterpreter.class); String currentDir = null; @@ -44,7 +53,9 @@ public abstract class FileInterpreter extends Interpreter { currentDir = new String("/"); } - /** Handling the arguments of the command. */ + /** + * Handling the arguments of the command. + */ public class CommandArgs { public String input = null; public String command = null; @@ -58,12 +69,12 @@ public abstract class FileInterpreter extends Interpreter { } private void parseArg(String arg) { - if (arg.charAt(0) == '-') { // handle flags + if (arg.charAt(0) == '-') { // handle flags for (int i = 0; i < arg.length(); i++) { Character c = arg.charAt(i); flags.add(c); } - } else { // handle other args + } else { // handle other args args.add(arg); } } @@ -88,7 +99,7 @@ public abstract class FileInterpreter extends Interpreter { public abstract boolean isDirectory(String path); // Combine paths, takes care of arguments such as .. - protected String getNewPath(String argument) { + protected String getNewPath(String argument){ Path arg = Paths.get(argument); Path ret = arg.isAbsolute() ? arg : Paths.get(currentDir, argument); return ret.normalize().toString(); @@ -134,7 +145,8 @@ public abstract class FileInterpreter extends Interpreter { } @Override - public void cancel(InterpreterContext context) {} + public void cancel(InterpreterContext context) { + } @Override public FormType getFormType() { @@ -148,13 +160,13 @@ public abstract class FileInterpreter extends Interpreter { @Override public Scheduler getScheduler() { - return SchedulerFactory.singleton() - .createOrGetFIFOScheduler(FileInterpreter.class.getName() + this.hashCode()); + return SchedulerFactory.singleton().createOrGetFIFOScheduler( + FileInterpreter.class.getName() + this.hashCode()); } @Override - public List<InterpreterCompletion> completion( - String buf, int cursor, InterpreterContext interpreterContext) { + public List<InterpreterCompletion> completion(String buf, int cursor, + InterpreterContext interpreterContext) { return null; } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/file/src/main/java/org/apache/zeppelin/file/HDFSCommand.java ---------------------------------------------------------------------- diff --git a/file/src/main/java/org/apache/zeppelin/file/HDFSCommand.java b/file/src/main/java/org/apache/zeppelin/file/HDFSCommand.java index 7c70eb7..6b3dc4b 100644 --- a/file/src/main/java/org/apache/zeppelin/file/HDFSCommand.java +++ b/file/src/main/java/org/apache/zeppelin/file/HDFSCommand.java @@ -1,35 +1,47 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * <p>http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * <p>Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.zeppelin.file; +import org.slf4j.Logger; + import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL; + import javax.ws.rs.core.UriBuilder; -import org.slf4j.Logger; -/** Definition and HTTP invocation methods for all WebHDFS commands. */ +/** + * Definition and HTTP invocation methods for all WebHDFS commands. + */ public class HDFSCommand { - /** Type of HTTP request. */ + /** + * Type of HTTP request. + */ public enum HttpType { GET, PUT } - /** Definition of WebHDFS operator. */ + /** + * Definition of WebHDFS operator. + */ public class Op { public String op; public HttpType cmd; @@ -42,7 +54,9 @@ public class HDFSCommand { } } - /** Definition of argument to an operator. */ + /** + * Definition of argument to an operator. + */ public class Arg { public String key; public String value; @@ -72,9 +86,11 @@ public class HDFSCommand { } public String checkArgs(Op op, String path, Arg[] args) throws Exception { - if (op == null - || path == null - || (op.minArgs > 0 && (args == null || args.length != op.minArgs))) { + if (op == null || + path == null || + (op.minArgs > 0 && + (args == null || + args.length != op.minArgs))) { String a = ""; a = (op != null) ? a + op.op + "\n" : a; a = (path != null) ? a + path + "\n" : a; @@ -94,7 +110,10 @@ public class HDFSCommand { } // Build URI - UriBuilder builder = UriBuilder.fromPath(url).path(path).queryParam("op", op.op); + UriBuilder builder = UriBuilder + .fromPath(url) + .path(path) + .queryParam("op", op.op); if (args != null) { for (Arg a : args) { @@ -113,7 +132,8 @@ public class HDFSCommand { logger.info("Sending 'GET' request to URL : " + hdfsUrl); logger.info("Response Code : " + responseCode); StringBuffer response = new StringBuffer(); - try (BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream())); ) { + try (BufferedReader in = new BufferedReader( + new InputStreamReader(con.getInputStream()));) { String inputLine; while ((inputLine = in.readLine()) != null) { response.append(inputLine); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/file/src/main/java/org/apache/zeppelin/file/HDFSFileInterpreter.java ---------------------------------------------------------------------- diff --git a/file/src/main/java/org/apache/zeppelin/file/HDFSFileInterpreter.java b/file/src/main/java/org/apache/zeppelin/file/HDFSFileInterpreter.java index 3d33e06..b27dcb6 100644 --- a/file/src/main/java/org/apache/zeppelin/file/HDFSFileInterpreter.java +++ b/file/src/main/java/org/apache/zeppelin/file/HDFSFileInterpreter.java @@ -1,33 +1,42 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * <p>http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * <p>Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.zeppelin.file; import com.google.gson.Gson; + import com.google.gson.annotations.SerializedName; +import org.apache.commons.lang.StringUtils; + import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Properties; -import org.apache.commons.lang.StringUtils; + import org.apache.zeppelin.completer.CompletionType; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; -/** HDFS implementation of File interpreter for Zeppelin. */ +/** + * HDFS implementation of File interpreter for Zeppelin. + */ public class HDFSFileInterpreter extends FileInterpreter { static final String HDFS_URL = "hdfs.url"; static final String HDFS_USER = "hdfs.user"; @@ -45,7 +54,7 @@ public class HDFSFileInterpreter extends FileInterpreter { gson = new Gson(); } - public HDFSFileInterpreter(Properties property) { + public HDFSFileInterpreter(Properties property){ super(property); prepare(); } @@ -53,7 +62,7 @@ public class HDFSFileInterpreter extends FileInterpreter { /** * Status of one file. * - * <p>matches returned JSON + * matches returned JSON */ public class OneFileStatus { public long accessTime; @@ -92,7 +101,7 @@ public class HDFSFileInterpreter extends FileInterpreter { /** * Status of one file. * - * <p>matches returned JSON + * matches returned JSON */ public class SingleFileStatus { @SerializedName("FileStatus") @@ -102,7 +111,7 @@ public class HDFSFileInterpreter extends FileInterpreter { /** * Status of all files in a directory. * - * <p>matches returned JSON + * matches returned JSON */ public class MultiFileStatus { @SerializedName("FileStatus") @@ -112,7 +121,7 @@ public class HDFSFileInterpreter extends FileInterpreter { /** * Status of all files in a directory. * - * <p>matches returned JSON + * matches returned JSON */ public class AllFileStatus { @SerializedName("FileStatuses") @@ -137,25 +146,26 @@ public class HDFSFileInterpreter extends FileInterpreter { } @Override - public void close() {} + public void close() { + } private String listDir(String path) throws Exception { return cmd.runCommand(cmd.listStatus, path, null); } - private String listPermission(OneFileStatus fs) { + private String listPermission(OneFileStatus fs){ StringBuilder sb = new StringBuilder(); sb.append(fs.type.equalsIgnoreCase("Directory") ? 'd' : '-'); int p = Integer.parseInt(fs.permission, 16); sb.append(((p & 0x400) == 0) ? '-' : 'r'); sb.append(((p & 0x200) == 0) ? '-' : 'w'); sb.append(((p & 0x100) == 0) ? '-' : 'x'); - sb.append(((p & 0x40) == 0) ? '-' : 'r'); - sb.append(((p & 0x20) == 0) ? '-' : 'w'); - sb.append(((p & 0x10) == 0) ? '-' : 'x'); - sb.append(((p & 0x4) == 0) ? '-' : 'r'); - sb.append(((p & 0x2) == 0) ? '-' : 'w'); - sb.append(((p & 0x1) == 0) ? '-' : 'x'); + sb.append(((p & 0x40) == 0) ? '-' : 'r'); + sb.append(((p & 0x20) == 0) ? '-' : 'w'); + sb.append(((p & 0x10) == 0) ? '-' : 'x'); + sb.append(((p & 0x4) == 0) ? '-' : 'r'); + sb.append(((p & 0x2) == 0) ? '-' : 'w'); + sb.append(((p & 0x1) == 0) ? '-' : 'x'); return sb.toString(); } @@ -170,7 +180,7 @@ public class HDFSFileInterpreter extends FileInterpreter { sb.append(((fs.replication == 0) ? "-" : fs.replication) + "\t "); sb.append(fs.owner + "\t"); sb.append(fs.group + "\t"); - if (args.flags.contains(new Character('h'))) { // human readable + if (args.flags.contains(new Character('h'))){ //human readable sb.append(humanReadableByteCount(fs.length) + "\t\t"); } else { sb.append(fs.length + "\t"); @@ -214,19 +224,17 @@ public class HDFSFileInterpreter extends FileInterpreter { } try { - // see if directory. + //see if directory. if (isDirectory(path)) { String sfs = listDir(path); if (sfs != null) { AllFileStatus allFiles = gson.fromJson(sfs, AllFileStatus.class); - if (allFiles != null - && allFiles.fileStatuses != null - && allFiles.fileStatuses.fileStatus != null) { - int length = - cmd.maxLength < allFiles.fileStatuses.fileStatus.length - ? cmd.maxLength - : allFiles.fileStatuses.fileStatus.length; + if (allFiles != null && + allFiles.fileStatuses != null && + allFiles.fileStatuses.fileStatus != null) { + int length = cmd.maxLength < allFiles.fileStatuses.fileStatus.length ? cmd.maxLength : + allFiles.fileStatuses.fileStatus.length; for (int index = 0; index < length; index++) { OneFileStatus fs = allFiles.fileStatuses.fileStatus[index]; all = all + listOne(path, fs) + '\n'; @@ -263,8 +271,8 @@ public class HDFSFileInterpreter extends FileInterpreter { } @Override - public List<InterpreterCompletion> completion( - String buf, int cursor, InterpreterContext interpreterContext) { + public List<InterpreterCompletion> completion(String buf, int cursor, + InterpreterContext interpreterContext) { logger.info("Completion request at position\t" + cursor + " in string " + buf); final List<InterpreterCompletion> suggestions = new ArrayList<>(); if (StringUtils.isEmpty(buf)) { @@ -274,16 +282,19 @@ public class HDFSFileInterpreter extends FileInterpreter { return suggestions; } - // part of a command == no spaces - if (buf.split(" ").length == 1) { + //part of a command == no spaces + if (buf.split(" ").length == 1){ if ("cd".contains(buf)) { - suggestions.add(new InterpreterCompletion("cd", "cd", CompletionType.command.name())); + suggestions.add(new InterpreterCompletion("cd", "cd", + CompletionType.command.name())); } if ("ls".contains(buf)) { - suggestions.add(new InterpreterCompletion("ls", "ls", CompletionType.command.name())); + suggestions.add(new InterpreterCompletion("ls", "ls", + CompletionType.command.name())); } if ("pwd".contains(buf)) { - suggestions.add(new InterpreterCompletion("pwd", "pwd", CompletionType.command.name())); + suggestions.add(new InterpreterCompletion("pwd", "pwd", + CompletionType.command.name())); } return suggestions; @@ -291,36 +302,35 @@ public class HDFSFileInterpreter extends FileInterpreter { // last word will contain the path we're working with. String lastToken = buf.substring(buf.lastIndexOf(" ") + 1); - if (lastToken.startsWith("-")) { // flag not path + if (lastToken.startsWith("-")) { //flag not path return null; } - String localPath = ""; // all things before the last '/' - String unfinished = lastToken; // unfished filenames or directories + String localPath = ""; //all things before the last '/' + String unfinished = lastToken; //unfished filenames or directories if (lastToken.contains("/")) { localPath = lastToken.substring(0, lastToken.lastIndexOf('/') + 1); unfinished = lastToken.substring(lastToken.lastIndexOf('/') + 1); } - String globalPath = getNewPath(localPath); // adjust for cwd + String globalPath = getNewPath(localPath); //adjust for cwd - if (isDirectory(globalPath)) { + if (isDirectory(globalPath)){ try { String fileStatusString = listDir(globalPath); if (fileStatusString != null) { AllFileStatus allFiles = gson.fromJson(fileStatusString, AllFileStatus.class); - if (allFiles != null - && allFiles.fileStatuses != null - && allFiles.fileStatuses.fileStatus != null) { + if (allFiles != null && + allFiles.fileStatuses != null && + allFiles.fileStatuses.fileStatus != null) { for (OneFileStatus fs : allFiles.fileStatuses.fileStatus) { if (fs.pathSuffix.contains(unfinished)) { - // only suggest the text after the last . + //only suggest the text after the last . String beforeLastPeriod = unfinished.substring(0, unfinished.lastIndexOf('.') + 1); - // beforeLastPeriod should be the start of fs.pathSuffix, so take the end of it. + //beforeLastPeriod should be the start of fs.pathSuffix, so take the end of it. String suggestedFinish = fs.pathSuffix.substring(beforeLastPeriod.length()); - suggestions.add( - new InterpreterCompletion( - suggestedFinish, suggestedFinish, CompletionType.path.name())); + suggestions.add(new InterpreterCompletion(suggestedFinish, suggestedFinish, + CompletionType.path.name())); } } return suggestions; @@ -334,7 +344,7 @@ public class HDFSFileInterpreter extends FileInterpreter { logger.info("path is not a directory. No values suggested."); } - // Error in string. + //Error in string. return null; } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/file/src/test/java/org/apache/zeppelin/file/HDFSFileInterpreterTest.java ---------------------------------------------------------------------- diff --git a/file/src/test/java/org/apache/zeppelin/file/HDFSFileInterpreterTest.java b/file/src/test/java/org/apache/zeppelin/file/HDFSFileInterpreterTest.java index 6267245..aa69886 100644 --- a/file/src/test/java/org/apache/zeppelin/file/HDFSFileInterpreterTest.java +++ b/file/src/test/java/org/apache/zeppelin/file/HDFSFileInterpreterTest.java @@ -1,34 +1,44 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * <p>http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * <p>Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.zeppelin.file; import static org.junit.Assert.assertNull; import com.google.gson.Gson; + +import junit.framework.TestCase; + +import org.junit.Test; +import org.slf4j.Logger; + import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Properties; -import junit.framework.TestCase; + import org.apache.zeppelin.completer.CompletionType; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; -import org.junit.Test; -import org.slf4j.Logger; -/** Tests Interpreter by running pre-determined commands against mock file system. */ +/** + * Tests Interpreter by running pre-determined commands against mock file system. + */ public class HDFSFileInterpreterTest extends TestCase { @Test public void testMaxLength() { @@ -111,10 +121,10 @@ public class HDFSFileInterpreterTest extends TestCase { assertEquals(result1.message().get(0).getData(), result11.message().get(0).getData()); // auto completion test - List expectedResultOne = - Arrays.asList(new InterpreterCompletion("ls", "ls", CompletionType.command.name())); - List expectedResultTwo = - Arrays.asList(new InterpreterCompletion("pwd", "pwd", CompletionType.command.name())); + List expectedResultOne = Arrays.asList( + new InterpreterCompletion("ls", "ls", CompletionType.command.name())); + List expectedResultTwo = Arrays.asList( + new InterpreterCompletion("pwd", "pwd", CompletionType.command.name())); List<InterpreterCompletion> resultOne = t.completion("l", 0, null); List<InterpreterCompletion> resultTwo = t.completion("p", 0, null); @@ -125,93 +135,93 @@ public class HDFSFileInterpreterTest extends TestCase { } } -/** Store command results from curl against a real file system. */ +/** + * Store command results from curl against a real file system. + */ class MockFileSystem { HashMap<String, String> mfs = new HashMap<>(); static final String FILE_STATUSES = - "{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16389," - + "\"group\":\"hadoop\",\"length\":0,\"modificationTime\":1438548219672," - + "\"owner\":\"yarn\",\"pathSuffix\":\"app-logs\",\"permission\":\"777\"," - + "\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" - + "{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16395," - + "\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438548030045," - + "\"owner\":\"hdfs\",\"pathSuffix\":\"hdp\",\"permission\":\"755\"," - + "\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" - + "{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16390," - + "\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438547985336," - + "\"owner\":\"mapred\",\"pathSuffix\":\"mapred\",\"permission\":\"755\"," - + "\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" - + "{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":2,\"fileId\":16392," - + "\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438547985346," - + "\"owner\":\"hdfs\",\"pathSuffix\":\"mr-history\",\"permission\":\"755\"," - + "\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" - + "{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16400," - + "\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438548089725," - + "\"owner\":\"hdfs\",\"pathSuffix\":\"system\",\"permission\":\"755\"," - + "\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" - + "{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16386," - + "\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438548150089," - + "\"owner\":\"hdfs\",\"pathSuffix\":\"tmp\",\"permission\":\"777\"," - + "\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" - + "{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16387," - + "\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438547921792," - + "\"owner\":\"hdfs\",\"pathSuffix\":\"user\",\"permission\":\"755\"," - + "\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}\n"; + "{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16389," + + "\"group\":\"hadoop\",\"length\":0,\"modificationTime\":1438548219672," + + "\"owner\":\"yarn\",\"pathSuffix\":\"app-logs\",\"permission\":\"777\"," + + "\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" + + "{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16395," + + "\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438548030045," + + "\"owner\":\"hdfs\",\"pathSuffix\":\"hdp\",\"permission\":\"755\"," + + "\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" + + "{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16390," + + "\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438547985336," + + "\"owner\":\"mapred\",\"pathSuffix\":\"mapred\",\"permission\":\"755\"," + + "\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" + + "{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":2,\"fileId\":16392," + + "\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438547985346," + + "\"owner\":\"hdfs\",\"pathSuffix\":\"mr-history\",\"permission\":\"755\"," + + "\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" + + "{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16400," + + "\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438548089725," + + "\"owner\":\"hdfs\",\"pathSuffix\":\"system\",\"permission\":\"755\"," + + "\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" + + "{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16386," + + "\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438548150089," + + "\"owner\":\"hdfs\",\"pathSuffix\":\"tmp\",\"permission\":\"777\"," + + "\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" + + "{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16387," + + "\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438547921792," + + "\"owner\":\"hdfs\",\"pathSuffix\":\"user\",\"permission\":\"755\"," + + "\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}\n"; void addListStatusData() { - mfs.put("/?op=LISTSTATUS", "{\"FileStatuses\":{\"FileStatus\":[\n" + FILE_STATUSES + "]}}"); - mfs.put( - "/user?op=LISTSTATUS", - "{\"FileStatuses\":{\"FileStatus\":[\n" - + " {\"accessTime\":0,\"blockSize\":0,\"childrenNum\":4,\"fileId\":16388," - + "\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1441253161263," - + "\"owner\":\"ambari-qa\",\"pathSuffix\":\"ambari-qa\",\"permission\":\"770\"," - + "\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}\n" - + " ]}}"); - mfs.put( - "/tmp?op=LISTSTATUS", - "{\"FileStatuses\":{\"FileStatus\":[\n" - + " {\"accessTime\":1441253097489,\"blockSize\":134217728,\"childrenNum\":0," - + "\"fileId\":16400,\"group\":\"hdfs\",\"length\":1645," - + "\"modificationTime\":1441253097517,\"owner\":\"hdfs\"," - + "\"pathSuffix\":\"ida8c06540_date040315\",\"permission\":\"755\"," - + "\"replication\":3,\"storagePolicy\":0,\"type\":\"FILE\"}\n" - + " ]}}"); - mfs.put( - "/mr-history/done?op=LISTSTATUS", - "{\"FileStatuses\":{\"FileStatus\":[\n" - + "{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16433," - + "\"group\":\"hadoop\",\"length\":0,\"modificationTime\":1441253197481," - + "\"owner\":\"mapred\",\"pathSuffix\":\"2015\",\"permission\":\"770\"," - + "\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}\n" - + "]}}"); + mfs.put("/?op=LISTSTATUS", + "{\"FileStatuses\":{\"FileStatus\":[\n" + FILE_STATUSES + + "]}}" + ); + mfs.put("/user?op=LISTSTATUS", "{\"FileStatuses\":{\"FileStatus\":[\n" + + " {\"accessTime\":0,\"blockSize\":0,\"childrenNum\":4,\"fileId\":16388," + + "\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1441253161263," + + "\"owner\":\"ambari-qa\",\"pathSuffix\":\"ambari-qa\",\"permission\":\"770\"," + + "\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}\n" + + " ]}}" + ); + mfs.put("/tmp?op=LISTSTATUS", + "{\"FileStatuses\":{\"FileStatus\":[\n" + + " {\"accessTime\":1441253097489,\"blockSize\":134217728,\"childrenNum\":0," + + "\"fileId\":16400,\"group\":\"hdfs\",\"length\":1645," + + "\"modificationTime\":1441253097517,\"owner\":\"hdfs\"," + + "\"pathSuffix\":\"ida8c06540_date040315\",\"permission\":\"755\"," + + "\"replication\":3,\"storagePolicy\":0,\"type\":\"FILE\"}\n" + + " ]}}" + ); + mfs.put("/mr-history/done?op=LISTSTATUS", + "{\"FileStatuses\":{\"FileStatus\":[\n" + + "{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16433," + + "\"group\":\"hadoop\",\"length\":0,\"modificationTime\":1441253197481," + + "\"owner\":\"mapred\",\"pathSuffix\":\"2015\",\"permission\":\"770\"," + + "\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}\n" + + "]}}" + ); } void addGetFileStatusData() { - mfs.put( - "/?op=GETFILESTATUS", - "{\"FileStatus\":{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":7,\"fileId\":16385," - + "\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438548089725," - + "\"owner\":\"hdfs\",\"pathSuffix\":\"\",\"permission\":\"755\"," - + "\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}}"); - mfs.put( - "/user?op=GETFILESTATUS", - "{\"FileStatus\":{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16387," - + "\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1441253043188," - + "\"owner\":\"hdfs\",\"pathSuffix\":\"\",\"permission\":\"755\"," - + "\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}}"); - mfs.put( - "/tmp?op=GETFILESTATUS", - "{\"FileStatus\":{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16386," - + "\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1441253097489," - + "\"owner\":\"hdfs\",\"pathSuffix\":\"\",\"permission\":\"777\"," - + "\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}}"); - mfs.put( - "/mr-history/done?op=GETFILESTATUS", - "{\"FileStatus\":{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16393," - + "\"group\":\"hadoop\",\"length\":0,\"modificationTime\":1441253197480," - + "\"owner\":\"mapred\",\"pathSuffix\":\"\",\"permission\":\"777\"," - + "\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}}"); + mfs.put("/?op=GETFILESTATUS", + "{\"FileStatus\":{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":7,\"fileId\":16385," + + "\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438548089725," + + "\"owner\":\"hdfs\",\"pathSuffix\":\"\",\"permission\":\"755\"," + + "\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}}"); + mfs.put("/user?op=GETFILESTATUS", + "{\"FileStatus\":{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16387," + + "\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1441253043188," + + "\"owner\":\"hdfs\",\"pathSuffix\":\"\",\"permission\":\"755\"," + + "\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}}"); + mfs.put("/tmp?op=GETFILESTATUS", + "{\"FileStatus\":{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16386," + + "\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1441253097489," + + "\"owner\":\"hdfs\",\"pathSuffix\":\"\",\"permission\":\"777\"," + + "\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}}"); + mfs.put("/mr-history/done?op=GETFILESTATUS", + "{\"FileStatus\":{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16393," + + "\"group\":\"hadoop\",\"length\":0,\"modificationTime\":1441253197480," + + "\"owner\":\"mapred\",\"pathSuffix\":\"\",\"permission\":\"777\"," + + "\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}}"); } public void addMockData(HDFSCommand.Op op) { @@ -228,7 +238,9 @@ class MockFileSystem { } } -/** Run commands against mock file system that simulates webhdfs responses. */ +/** + * Run commands against mock file system that simulates webhdfs responses. + */ class MockHDFSCommand extends HDFSCommand { MockFileSystem fs = null; @@ -259,14 +271,15 @@ class MockHDFSCommand extends HDFSCommand { } } -/** Mock Interpreter - uses Mock HDFS command. */ +/** + * Mock Interpreter - uses Mock HDFS command. + */ class MockHDFSFileInterpreter extends HDFSFileInterpreter { @Override public void prepare() { // Run commands against mock File System instead of WebHDFS - int i = - Integer.parseInt( - getProperty(HDFS_MAXLENGTH) == null ? "1000" : getProperty(HDFS_MAXLENGTH)); + int i = Integer.parseInt(getProperty(HDFS_MAXLENGTH) == null ? "1000" + : getProperty(HDFS_MAXLENGTH)); cmd = new MockHDFSCommand("", "", logger, i); gson = new Gson(); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/flink/pom.xml ---------------------------------------------------------------------- diff --git a/flink/pom.xml b/flink/pom.xml index ffba436..217813b 100644 --- a/flink/pom.xml +++ b/flink/pom.xml @@ -297,6 +297,14 @@ <plugin> <artifactId>maven-resources-plugin</artifactId> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <configuration> + <skip>false</skip> + </configuration> + </plugin> + </plugins> </build> </project> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java ---------------------------------------------------------------------- diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java index cab30b6..c14407d 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java @@ -17,9 +17,6 @@ package org.apache.zeppelin.flink; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; import org.apache.flink.api.scala.ExecutionEnvironment; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -27,6 +24,10 @@ import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + public class FlinkInterpreter extends Interpreter { private FlinkScalaInterpreter innerIntp; @@ -43,11 +44,8 @@ public class FlinkInterpreter extends Interpreter { // bind ZeppelinContext int maxRow = Integer.parseInt(getProperty("zeppelin.flink.maxResult", "1000")); - this.z = - new FlinkZeppelinContext( - innerIntp.getBatchTableEnviroment(), - getInterpreterGroup().getInterpreterHookRegistry(), - maxRow); + this.z = new FlinkZeppelinContext(innerIntp.getBatchTableEnviroment(), + getInterpreterGroup().getInterpreterHookRegistry(), maxRow); List<String> modifiers = new ArrayList<>(); modifiers.add("@transient"); this.innerIntp.bind("z", z.getClass().getCanonicalName(), z, modifiers); @@ -68,7 +66,9 @@ public class FlinkInterpreter extends Interpreter { } @Override - public void cancel(InterpreterContext context) throws InterpreterException {} + public void cancel(InterpreterContext context) throws InterpreterException { + + } @Override public FormType getFormType() throws InterpreterException { @@ -81,8 +81,10 @@ public class FlinkInterpreter extends Interpreter { } @Override - public List<InterpreterCompletion> completion( - String buf, int cursor, InterpreterContext interpreterContext) throws InterpreterException { + public List<InterpreterCompletion> completion(String buf, + int cursor, + InterpreterContext interpreterContext) + throws InterpreterException { return innerIntp.completion(buf, cursor, interpreterContext); } @@ -97,4 +99,5 @@ public class FlinkInterpreter extends Interpreter { FlinkZeppelinContext getZeppelinContext() { return this.z; } + } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/flink/src/main/java/org/apache/zeppelin/flink/FlinkSQLInterpreter.java ---------------------------------------------------------------------- diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkSQLInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkSQLInterpreter.java index 25830a7..1ac3547 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkSQLInterpreter.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkSQLInterpreter.java @@ -17,12 +17,14 @@ package org.apache.zeppelin.flink; -import java.util.Properties; + import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterResult; +import java.util.Properties; + public class FlinkSQLInterpreter extends Interpreter { private FlinkSQLScalaInterpreter sqlScalaInterpreter; @@ -31,18 +33,21 @@ public class FlinkSQLInterpreter extends Interpreter { super(properties); } + @Override public void open() throws InterpreterException { FlinkInterpreter flinkInterpreter = getInterpreterInTheSameSessionByClassName(FlinkInterpreter.class); FlinkZeppelinContext z = flinkInterpreter.getZeppelinContext(); int maxRow = Integer.parseInt(getProperty("zeppelin.flink.maxResult", "1000")); - this.sqlScalaInterpreter = - new FlinkSQLScalaInterpreter(flinkInterpreter.getInnerScalaInterpreter(), z, maxRow); + this.sqlScalaInterpreter = new FlinkSQLScalaInterpreter( + flinkInterpreter.getInnerScalaInterpreter(), z, maxRow); } @Override - public void close() throws InterpreterException {} + public void close() throws InterpreterException { + + } @Override public InterpreterResult interpret(String st, InterpreterContext context) @@ -51,7 +56,9 @@ public class FlinkSQLInterpreter extends Interpreter { } @Override - public void cancel(InterpreterContext context) throws InterpreterException {} + public void cancel(InterpreterContext context) throws InterpreterException { + + } @Override public FormType getFormType() throws InterpreterException { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java index bec8389..0c42139 100644 --- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java +++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java @@ -16,15 +16,7 @@ */ package org.apache.zeppelin.flink; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.ui.CheckBox; import org.apache.zeppelin.display.ui.Select; @@ -41,6 +33,16 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + public class FlinkInterpreterTest { private FlinkInterpreter interpreter; @@ -69,8 +71,8 @@ public class FlinkInterpreterTest { @Test public void testBasicScala() throws InterpreterException, IOException { - InterpreterResult result = - interpreter.interpret("val a=\"hello world\"", getInterpreterContext()); + InterpreterResult result = interpreter.interpret("val a=\"hello world\"", + getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertEquals("a: String = hello world\n", output); @@ -100,42 +102,38 @@ public class FlinkInterpreterTest { result = interpreter.interpret("/*comment here*/", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - result = - interpreter.interpret("/*comment here*/\nprint(\"hello world\")", getInterpreterContext()); + result = interpreter.interpret("/*comment here*/\nprint(\"hello world\")", + getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); // multiple line comment - result = interpreter.interpret("/*line 1 \n line 2*/", getInterpreterContext()); + result = interpreter.interpret("/*line 1 \n line 2*/", + getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); // test function - result = - interpreter.interpret("def add(x:Int, y:Int)\n{ return x+y }", getInterpreterContext()); + result = interpreter.interpret("def add(x:Int, y:Int)\n{ return x+y }", + getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); result = interpreter.interpret("print(add(1,2))", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - result = - interpreter.interpret( - "/*line 1 \n line 2*/print(\"hello world\")", getInterpreterContext()); + result = interpreter.interpret("/*line 1 \n line 2*/print(\"hello world\")", + getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); // companion object - result = - interpreter.interpret( - "class Counter {\n " - + "var value: Long = 0} \n" - + "object Counter {\n def apply(x: Long) = new Counter()\n}", - getInterpreterContext()); + result = interpreter.interpret("class Counter {\n " + + "var value: Long = 0} \n" + + "object Counter {\n def apply(x: Long) = new Counter()\n}", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); // case class - result = - interpreter.interpret( - "case class Bank(age:Integer, job:String, marital : String, education : String," - + " balance : Integer)\n", - getInterpreterContext()); + result = interpreter.interpret( + "case class Bank(age:Integer, job:String, marital : String, education : String," + + " balance : Integer)\n", + getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); // ZeppelinContext @@ -143,12 +141,14 @@ public class FlinkInterpreterTest { result = interpreter.interpret("val ds = benv.fromElements(1,2,3)\nz.show(ds)", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertEquals(InterpreterResult.Type.TABLE, messageOutput.get(0).getType()); - assertEquals( - "f0\n" + "1\n" + "2\n" + "3\n", - messageOutput.get(0).toInterpreterResultMessage().getData()); + assertEquals("f0\n" + + "1\n" + + "2\n" + + "3\n", messageOutput.get(0).toInterpreterResultMessage().getData()); context = getInterpreterContext(); - result = interpreter.interpret("z.input(\"name\", \"default_name\")", context); + result = interpreter.interpret("z.input(\"name\", \"default_name\")", + context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertEquals(1, context.getGui().getForms().size()); assertTrue(context.getGui().getForms().get("name") instanceof TextBox); @@ -157,11 +157,8 @@ public class FlinkInterpreterTest { assertEquals("default_name", textBox.getDefaultValue()); context = getInterpreterContext(); - result = - interpreter.interpret( - "z.checkbox(\"checkbox_1\", " - + "Seq(\"value_2\"), Seq((\"value_1\", \"name_1\"), (\"value_2\", \"name_2\")))", - context); + result = interpreter.interpret("z.checkbox(\"checkbox_1\", " + + "Seq(\"value_2\"), Seq((\"value_1\", \"name_1\"), (\"value_2\", \"name_2\")))", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertEquals(1, context.getGui().getForms().size()); assertTrue(context.getGui().getForms().get("checkbox_1") instanceof CheckBox); @@ -176,11 +173,8 @@ public class FlinkInterpreterTest { assertEquals("name_2", checkBox.getOptions()[1].getDisplayName()); context = getInterpreterContext(); - result = - interpreter.interpret( - "z.select(\"select_1\", Seq(\"value_2\"), " - + "Seq((\"value_1\", \"name_1\"), (\"value_2\", \"name_2\")))", - context); + result = interpreter.interpret("z.select(\"select_1\", Seq(\"value_2\"), " + + "Seq((\"value_1\", \"name_1\"), (\"value_2\", \"name_2\")))", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertEquals(1, context.getGui().getForms().size()); assertTrue(context.getGui().getForms().get("select_1") instanceof Select); @@ -198,25 +192,24 @@ public class FlinkInterpreterTest { @Test public void testCompletion() throws InterpreterException { - InterpreterResult result = - interpreter.interpret("val a=\"hello world\"", getInterpreterContext()); + InterpreterResult result = interpreter.interpret("val a=\"hello world\"", + getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertEquals("a: String = hello world\n", output); - List<InterpreterCompletion> completions = - interpreter.completion("a.", 2, getInterpreterContext()); + List<InterpreterCompletion> completions = interpreter.completion("a.", 2, + getInterpreterContext()); assertTrue(completions.size() > 0); } + // Disable it for now as there's extra std output from flink shell. @Test public void testWordCount() throws InterpreterException, IOException { - interpreter.interpret( - "val text = benv.fromElements(\"To be or not to be\")", getInterpreterContext()); - interpreter.interpret( - "val counts = text.flatMap { _.toLowerCase.split(\" \") }" - + ".map { (_, 1) }.groupBy(0).sum(1)", + interpreter.interpret("val text = benv.fromElements(\"To be or not to be\")", getInterpreterContext()); + interpreter.interpret("val counts = text.flatMap { _.toLowerCase.split(\" \") }" + + ".map { (_, 1) }.groupBy(0).sum(1)", getInterpreterContext()); InterpreterResult result = interpreter.interpret("counts.print()", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); @@ -232,31 +225,31 @@ public class FlinkInterpreterTest { private InterpreterContext getInterpreterContext() { output = ""; messageOutput = new ArrayList<>(); - InterpreterContext context = - InterpreterContext.builder() - .setInterpreterOut(new InterpreterOutput(null)) - .setAngularObjectRegistry(new AngularObjectRegistry("flink", null)) - .build(); - context.out = - new InterpreterOutput( - new InterpreterOutputListener() { - @Override - public void onUpdateAll(InterpreterOutput out) {} - - @Override - public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) { - try { - output = out.toInterpreterResultMessage().getData(); - } catch (IOException e) { - e.printStackTrace(); - } - } - - @Override - public void onUpdate(int index, InterpreterResultMessageOutput out) { - messageOutput.add(out); - } - }); + InterpreterContext context = InterpreterContext.builder() + .setInterpreterOut(new InterpreterOutput(null)) + .setAngularObjectRegistry(new AngularObjectRegistry("flink", null)) + .build(); + context.out = new InterpreterOutput( + new InterpreterOutputListener() { + @Override + public void onUpdateAll(InterpreterOutput out) { + + } + + @Override + public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) { + try { + output = out.toInterpreterResultMessage().getData(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void onUpdate(int index, InterpreterResultMessageOutput out) { + messageOutput.add(out); + } + }); return context; } }
