http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java index 4c21136..95674ea 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java @@ -20,22 +20,7 @@ package org.apache.zeppelin.livy; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.annotations.SerializedName; -import java.io.FileInputStream; -import java.io.IOException; -import java.nio.charset.Charset; -import java.security.KeyStore; -import java.security.Principal; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import javax.net.ssl.SSLContext; + import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.http.auth.AuthSchemeProvider; @@ -53,13 +38,6 @@ import org.apache.http.impl.auth.SPNegoSchemeFactory; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClients; -import org.apache.zeppelin.interpreter.Interpreter; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterException; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.InterpreterResultMessage; -import org.apache.zeppelin.interpreter.InterpreterUtils; -import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.HttpEntity; @@ -75,7 +53,35 @@ import org.springframework.web.client.HttpServerErrorException; import org.springframework.web.client.RestClientException; import org.springframework.web.client.RestTemplate; -/** Base class for livy interpreters. */ +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.charset.Charset; +import java.security.KeyStore; +import java.security.Principal; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import javax.net.ssl.SSLContext; + +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResultMessage; +import org.apache.zeppelin.interpreter.InterpreterUtils; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; + +/** + * Base class for livy interpreters. + */ public abstract class BaseLivyInterpreter extends Interpreter { protected static final Logger LOGGER = LoggerFactory.getLogger(BaseLivyInterpreter.class); @@ -96,34 +102,32 @@ public abstract class BaseLivyInterpreter extends Interpreter { // delegate to sharedInterpreter when it is available protected LivySharedInterpreter sharedInterpreter; - Set<Object> paragraphsToCancel = - Collections.newSetFromMap(new ConcurrentHashMap<Object, Boolean>()); + Set<Object> paragraphsToCancel = Collections.newSetFromMap( + new ConcurrentHashMap<Object, Boolean>()); private ConcurrentHashMap<String, Integer> paragraphId2StmtProgressMap = new ConcurrentHashMap<>(); public BaseLivyInterpreter(Properties property) { super(property); this.livyURL = property.getProperty("zeppelin.livy.url"); - this.displayAppInfo = - Boolean.parseBoolean(property.getProperty("zeppelin.livy.displayAppInfo", "true")); - this.restartDeadSession = - Boolean.parseBoolean(property.getProperty("zeppelin.livy.restart_dead_session", "false")); - this.sessionCreationTimeout = - Integer.parseInt(property.getProperty("zeppelin.livy.session.create_timeout", 120 + "")); - this.pullStatusInterval = - Integer.parseInt( - property.getProperty("zeppelin.livy.pull_status.interval.millis", 1000 + "")); - this.maxLogLines = Integer.parseInt(property.getProperty("zeppelin.livy.maxLogLines", "1000")); + this.displayAppInfo = Boolean.parseBoolean( + property.getProperty("zeppelin.livy.displayAppInfo", "true")); + this.restartDeadSession = Boolean.parseBoolean( + property.getProperty("zeppelin.livy.restart_dead_session", "false")); + this.sessionCreationTimeout = Integer.parseInt( + property.getProperty("zeppelin.livy.session.create_timeout", 120 + "")); + this.pullStatusInterval = Integer.parseInt( + property.getProperty("zeppelin.livy.pull_status.interval.millis", 1000 + "")); + this.maxLogLines = Integer.parseInt(property.getProperty("zeppelin.livy.maxLogLines", + "1000")); this.restTemplate = createRestTemplate(); if (!StringUtils.isBlank(property.getProperty("zeppelin.livy.http.headers"))) { String[] headers = property.getProperty("zeppelin.livy.http.headers").split(";"); for (String header : headers) { String[] splits = header.split(":", -1); if (splits.length != 2) { - throw new RuntimeException( - "Invalid format of http headers: " - + header - + ", valid http header format is HEADER_NAME:HEADER_VALUE"); + throw new RuntimeException("Invalid format of http headers: " + header + + ", valid http header format is HEADER_NAME:HEADER_VALUE"); } customHeaders.put(splits[0].trim(), envSubstitute(splits[1].trim())); } @@ -159,8 +163,8 @@ public abstract class BaseLivyInterpreter extends Interpreter { initLivySession(); } } catch (LivyException e) { - String msg = - "Fail to create session, please check livy interpreter log and " + "livy server log"; + String msg = "Fail to create session, please check livy interpreter log and " + + "livy server log"; throw new InterpreterException(msg, e); } } @@ -187,17 +191,14 @@ public abstract class BaseLivyInterpreter extends Interpreter { sessionInfo.appId = extractAppId(); } - if (sessionInfo.appInfo == null - || StringUtils.isEmpty(sessionInfo.appInfo.get("sparkUiUrl"))) { + if (sessionInfo.appInfo == null || + StringUtils.isEmpty(sessionInfo.appInfo.get("sparkUiUrl"))) { sessionInfo.webUIAddress = extractWebUIAddress(); } else { sessionInfo.webUIAddress = sessionInfo.appInfo.get("sparkUiUrl"); } - LOGGER.info( - "Create livy session successfully with sessionId: {}, appId: {}, webUI: {}", - sessionInfo.id, - sessionInfo.appId, - sessionInfo.webUIAddress); + LOGGER.info("Create livy session successfully with sessionId: {}, appId: {}, webUI: {}", + sessionInfo.id, sessionInfo.appId, sessionInfo.webUIAddress); } else { LOGGER.info("Create livy session successfully with sessionId: {}", this.sessionInfo.id); } @@ -234,20 +235,20 @@ public abstract class BaseLivyInterpreter extends Interpreter { return interpret(st, null, context.getParagraphId(), this.displayAppInfo, true, true); } catch (LivyException e) { LOGGER.error("Fail to interpret:" + st, e); - return new InterpreterResult( - InterpreterResult.Code.ERROR, InterpreterUtils.getMostRelevantMessage(e)); + return new InterpreterResult(InterpreterResult.Code.ERROR, + InterpreterUtils.getMostRelevantMessage(e)); } } @Override - public List<InterpreterCompletion> completion( - String buf, int cursor, InterpreterContext interpreterContext) { + public List<InterpreterCompletion> completion(String buf, int cursor, + InterpreterContext interpreterContext) { List<InterpreterCompletion> candidates = Collections.emptyList(); try { candidates = callCompletion(new CompletionRequest(buf, getSessionKind(), cursor)); } catch (SessionNotFoundException e) { - LOGGER.warn( - "Livy session {} is expired. Will return empty list of candidates.", getSessionInfo().id); + LOGGER.warn("Livy session {} is expired. Will return empty list of candidates.", + getSessionInfo().id); } catch (LivyException le) { logger.error("Failed to call code completions. Will return empty list of candidates", le); } @@ -257,10 +258,8 @@ public abstract class BaseLivyInterpreter extends Interpreter { private List<InterpreterCompletion> callCompletion(CompletionRequest req) throws LivyException { List<InterpreterCompletion> candidates = new ArrayList<>(); try { - CompletionResponse resp = - CompletionResponse.fromJson( - callRestAPI( - "/sessions/" + getSessionInfo().id + "/completion", "POST", req.toJson())); + CompletionResponse resp = CompletionResponse.fromJson( + callRestAPI("/sessions/" + getSessionInfo().id + "/completion", "POST", req.toJson())); for (String candidate : resp.candidates) { candidates.add(new InterpreterCompletion(candidate, candidate, StringUtils.EMPTY)); } @@ -299,51 +298,37 @@ public abstract class BaseLivyInterpreter extends Interpreter { return 0; } - private SessionInfo createSession(String user, String kind) throws LivyException { + private SessionInfo createSession(String user, String kind) + throws LivyException { try { Map<String, String> conf = new HashMap<>(); for (Map.Entry<Object, Object> entry : getProperties().entrySet()) { - if (entry.getKey().toString().startsWith("livy.spark.") - && !entry.getValue().toString().isEmpty()) { + if (entry.getKey().toString().startsWith("livy.spark.") && + !entry.getValue().toString().isEmpty()) { conf.put(entry.getKey().toString().substring(5), entry.getValue().toString()); } } - CreateSessionRequest request = - new CreateSessionRequest( - kind, user == null || user.equals("anonymous") ? null : user, conf); - SessionInfo sessionInfo = - SessionInfo.fromJson(callRestAPI("/sessions", "POST", request.toJson())); + CreateSessionRequest request = new CreateSessionRequest(kind, + user == null || user.equals("anonymous") ? null : user, conf); + SessionInfo sessionInfo = SessionInfo.fromJson( + callRestAPI("/sessions", "POST", request.toJson())); long start = System.currentTimeMillis(); // pull the session status until it is idle or timeout while (!sessionInfo.isReady()) { if ((System.currentTimeMillis() - start) / 1000 > sessionCreationTimeout) { - String msg = - "The creation of session " - + sessionInfo.id - + " is timeout within " - + sessionCreationTimeout - + " seconds, appId: " - + sessionInfo.appId - + ", log:\n" - + StringUtils.join(getSessionLog(sessionInfo.id).log, "\n"); + String msg = "The creation of session " + sessionInfo.id + " is timeout within " + + sessionCreationTimeout + " seconds, appId: " + sessionInfo.appId + + ", log:\n" + StringUtils.join(getSessionLog(sessionInfo.id).log, "\n"); throw new LivyException(msg); } Thread.sleep(pullStatusInterval); sessionInfo = getSessionInfo(sessionInfo.id); - LOGGER.info( - "Session {} is in state {}, appId {}", - sessionInfo.id, - sessionInfo.state, + LOGGER.info("Session {} is in state {}, appId {}", sessionInfo.id, sessionInfo.state, sessionInfo.appId); if (sessionInfo.isFinished()) { - String msg = - "Session " - + sessionInfo.id - + " is finished, appId: " - + sessionInfo.appId - + ", log:\n" - + StringUtils.join(getSessionLog(sessionInfo.id).log, "\n"); + String msg = "Session " + sessionInfo.id + " is finished, appId: " + sessionInfo.appId + + ", log:\n" + StringUtils.join(getSessionLog(sessionInfo.id).log, "\n"); throw new LivyException(msg); } } @@ -359,34 +344,25 @@ public abstract class BaseLivyInterpreter extends Interpreter { } private SessionLog getSessionLog(int sessionId) throws LivyException { - return SessionLog.fromJson( - callRestAPI("/sessions/" + sessionId + "/log?size=" + maxLogLines, "GET")); + return SessionLog.fromJson(callRestAPI("/sessions/" + sessionId + "/log?size=" + maxLogLines, + "GET")); } - public InterpreterResult interpret( - String code, - String paragraphId, - boolean displayAppInfo, - boolean appendSessionExpired, - boolean appendSessionDead) - throws LivyException { - return interpret( - code, - sharedInterpreter.isSupported() ? getSessionKind() : null, - paragraphId, - displayAppInfo, - appendSessionExpired, - appendSessionDead); - } - - public InterpreterResult interpret( - String code, - String codeType, - String paragraphId, - boolean displayAppInfo, - boolean appendSessionExpired, - boolean appendSessionDead) - throws LivyException { + public InterpreterResult interpret(String code, + String paragraphId, + boolean displayAppInfo, + boolean appendSessionExpired, + boolean appendSessionDead) throws LivyException { + return interpret(code, sharedInterpreter.isSupported() ? getSessionKind() : null, + paragraphId, displayAppInfo, appendSessionExpired, appendSessionDead); + } + + public InterpreterResult interpret(String code, + String codeType, + String paragraphId, + boolean displayAppInfo, + boolean appendSessionExpired, + boolean appendSessionDead) throws LivyException { StatementInfo stmtInfo = null; boolean sessionExpired = false; boolean sessionDead = false; @@ -417,9 +393,8 @@ public abstract class BaseLivyInterpreter extends Interpreter { } stmtInfo = executeStatement(new ExecuteRequest(code, codeType)); } else { - throw new LivyException( - "%html <font color=\"red\">Livy session is dead somehow, " - + "please check log to see why it is dead, and then restart livy interpreter</font>"); + throw new LivyException("%html <font color=\"red\">Livy session is dead somehow, " + + "please check log to see why it is dead, and then restart livy interpreter</font>"); } } @@ -441,8 +416,8 @@ public abstract class BaseLivyInterpreter extends Interpreter { } } if (appendSessionExpired || appendSessionDead) { - return appendSessionExpireDead( - getResultFromStatementInfo(stmtInfo, displayAppInfo), sessionExpired, sessionDead); + return appendSessionExpireDead(getResultFromStatementInfo(stmtInfo, displayAppInfo), + sessionExpired, sessionDead); } else { return getResultFromStatementInfo(stmtInfo, displayAppInfo); } @@ -485,20 +460,20 @@ public abstract class BaseLivyInterpreter extends Interpreter { } } - private InterpreterResult appendSessionExpireDead( - InterpreterResult result, boolean sessionExpired, boolean sessionDead) { + private InterpreterResult appendSessionExpireDead(InterpreterResult result, + boolean sessionExpired, + boolean sessionDead) { InterpreterResult result2 = new InterpreterResult(result.code()); if (sessionExpired) { - result2.add( - InterpreterResult.Type.HTML, - "<font color=\"red\">Previous livy session is expired, new livy session is created. " - + "Paragraphs that depend on this paragraph need to be re-executed!</font>"); + result2.add(InterpreterResult.Type.HTML, + "<font color=\"red\">Previous livy session is expired, new livy session is created. " + + "Paragraphs that depend on this paragraph need to be re-executed!</font>"); + } if (sessionDead) { - result2.add( - InterpreterResult.Type.HTML, - "<font color=\"red\">Previous livy session is dead, new livy session is created. " - + "Paragraphs that depend on this paragraph need to be re-executed!</font>"); + result2.add(InterpreterResult.Type.HTML, + "<font color=\"red\">Previous livy session is dead, new livy session is created. " + + "Paragraphs that depend on this paragraph need to be re-executed!</font>"); } for (InterpreterResultMessage message : result.message()) { @@ -507,8 +482,8 @@ public abstract class BaseLivyInterpreter extends Interpreter { return result2; } - private InterpreterResult getResultFromStatementInfo( - StatementInfo stmtInfo, boolean displayAppInfo) { + private InterpreterResult getResultFromStatementInfo(StatementInfo stmtInfo, + boolean displayAppInfo) { if (stmtInfo.output != null && stmtInfo.output.isError()) { InterpreterResult result = new InterpreterResult(InterpreterResult.Code.ERROR); StringBuilder sb = new StringBuilder(); @@ -530,7 +505,7 @@ public abstract class BaseLivyInterpreter extends Interpreter { // This case should never happen, just in case return new InterpreterResult(InterpreterResult.Code.ERROR, "Empty output"); } else { - // TODO(zjffdu) support other types of data (like json, image and etc) + //TODO(zjffdu) support other types of data (like json, image and etc) String result = stmtInfo.output.data.plainText; // check table magic result first @@ -551,13 +526,11 @@ public abstract class BaseLivyInterpreter extends Interpreter { outputBuilder.append(StringUtils.join(row, "\t")); outputBuilder.append("\n"); } - return new InterpreterResult( - InterpreterResult.Code.SUCCESS, InterpreterResult.Type.TABLE, outputBuilder.toString()); + return new InterpreterResult(InterpreterResult.Code.SUCCESS, + InterpreterResult.Type.TABLE, outputBuilder.toString()); } else if (stmtInfo.output.data.imagePng != null) { - return new InterpreterResult( - InterpreterResult.Code.SUCCESS, - InterpreterResult.Type.IMG, - (String) stmtInfo.output.data.imagePng); + return new InterpreterResult(InterpreterResult.Code.SUCCESS, + InterpreterResult.Type.IMG, (String) stmtInfo.output.data.imagePng); } else if (result != null) { result = result.trim(); if (result.startsWith("<link") @@ -571,15 +544,9 @@ public abstract class BaseLivyInterpreter extends Interpreter { if (displayAppInfo) { InterpreterResult interpreterResult = new InterpreterResult(InterpreterResult.Code.SUCCESS); interpreterResult.add(result); - String appInfoHtml = - "<hr/>Spark Application Id: " - + sessionInfo.appId - + "<br/>" - + "Spark WebUI: <a href=\"" - + sessionInfo.webUIAddress - + "\">" - + sessionInfo.webUIAddress - + "</a>"; + String appInfoHtml = "<hr/>Spark Application Id: " + sessionInfo.appId + "<br/>" + + "Spark WebUI: <a href=\"" + sessionInfo.webUIAddress + "\">" + + sessionInfo.webUIAddress + "</a>"; interpreterResult.add(InterpreterResult.Type.HTML, appInfoHtml); return interpreterResult; } else { @@ -588,13 +555,14 @@ public abstract class BaseLivyInterpreter extends Interpreter { } } - private StatementInfo executeStatement(ExecuteRequest executeRequest) throws LivyException { - return StatementInfo.fromJson( - callRestAPI( - "/sessions/" + sessionInfo.id + "/statements", "POST", executeRequest.toJson())); + private StatementInfo executeStatement(ExecuteRequest executeRequest) + throws LivyException { + return StatementInfo.fromJson(callRestAPI("/sessions/" + sessionInfo.id + "/statements", "POST", + executeRequest.toJson())); } - private StatementInfo getStatementInfo(int statementId) throws LivyException { + private StatementInfo getStatementInfo(int statementId) + throws LivyException { return StatementInfo.fromJson( callRestAPI("/sessions/" + sessionInfo.id + "/statements/" + statementId, "GET")); } @@ -603,11 +571,12 @@ public abstract class BaseLivyInterpreter extends Interpreter { callRestAPI("/sessions/" + sessionInfo.id + "/statements/" + statementId + "/cancel", "POST"); } + private RestTemplate createRestTemplate() { String keytabLocation = getProperty("zeppelin.livy.keytab"); String principal = getProperty("zeppelin.livy.principal"); - boolean isSpnegoEnabled = - StringUtils.isNotEmpty(keytabLocation) && StringUtils.isNotEmpty(principal); + boolean isSpnegoEnabled = StringUtils.isNotEmpty(keytabLocation) && + StringUtils.isNotEmpty(principal); HttpClient httpClient = null; if (livyURL.startsWith("https:")) { @@ -617,37 +586,37 @@ public abstract class BaseLivyInterpreter extends Interpreter { throw new RuntimeException("No zeppelin.livy.ssl.trustStore specified for livy ssl"); } if (StringUtils.isBlank(password)) { - throw new RuntimeException( - "No zeppelin.livy.ssl.trustStorePassword specified " + "for livy ssl"); + throw new RuntimeException("No zeppelin.livy.ssl.trustStorePassword specified " + + "for livy ssl"); } FileInputStream inputStream = null; try { inputStream = new FileInputStream(keystoreFile); KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); trustStore.load(new FileInputStream(keystoreFile), password.toCharArray()); - SSLContext sslContext = SSLContexts.custom().loadTrustMaterial(trustStore).build(); + SSLContext sslContext = SSLContexts.custom() + .loadTrustMaterial(trustStore) + .build(); SSLConnectionSocketFactory csf = new SSLConnectionSocketFactory(sslContext); HttpClientBuilder httpClientBuilder = HttpClients.custom().setSSLSocketFactory(csf); - RequestConfig reqConfig = - new RequestConfig() { - @Override - public boolean isAuthenticationEnabled() { - return true; - } - }; + RequestConfig reqConfig = new RequestConfig() { + @Override + public boolean isAuthenticationEnabled() { + return true; + } + }; httpClientBuilder.setDefaultRequestConfig(reqConfig); - Credentials credentials = - new Credentials() { - @Override - public String getPassword() { - return null; - } - - @Override - public Principal getUserPrincipal() { - return null; - } - }; + Credentials credentials = new Credentials() { + @Override + public String getPassword() { + return null; + } + + @Override + public Principal getUserPrincipal() { + return null; + } + }; CredentialsProvider credsProvider = new BasicCredentialsProvider(); credsProvider.setCredentials(AuthScope.ANY, credentials); httpClientBuilder.setDefaultCredentialsProvider(credsProvider); @@ -687,9 +656,8 @@ public abstract class BaseLivyInterpreter extends Interpreter { restTemplate = new RestTemplate(new HttpComponentsClientHttpRequestFactory(httpClient)); } } - restTemplate - .getMessageConverters() - .add(0, new StringHttpMessageConverter(Charset.forName("UTF-8"))); + restTemplate.getMessageConverters().add(0, + new StringHttpMessageConverter(Charset.forName("UTF-8"))); return restTemplate; } @@ -721,10 +689,8 @@ public abstract class BaseLivyInterpreter extends Interpreter { } } catch (HttpClientErrorException e) { response = new ResponseEntity(e.getResponseBodyAsString(), e.getStatusCode()); - LOGGER.error( - String.format( - "Error with %s StatusCode: %s", - response.getStatusCode().value(), e.getResponseBodyAsString())); + LOGGER.error(String.format("Error with %s StatusCode: %s", + response.getStatusCode().value(), e.getResponseBodyAsString())); } catch (RestClientException e) { // Exception happens when kerberos is enabled. if (e.getCause() instanceof HttpClientErrorException) { @@ -732,10 +698,8 @@ public abstract class BaseLivyInterpreter extends Interpreter { if (cause.getResponseBodyAsString().matches(SESSION_NOT_FOUND_PATTERN)) { throw new SessionNotFoundException(cause.getResponseBodyAsString()); } - throw new LivyException( - cause.getResponseBodyAsString() - + "\n" - + ExceptionUtils.getFullStackTrace(ExceptionUtils.getRootCause(e))); + throw new LivyException(cause.getResponseBodyAsString() + "\n" + + ExceptionUtils.getFullStackTrace(ExceptionUtils.getRootCause(e))); } if (e instanceof HttpServerErrorException) { HttpServerErrorException errorException = (HttpServerErrorException) e; @@ -750,27 +714,25 @@ public abstract class BaseLivyInterpreter extends Interpreter { if (response == null) { throw new LivyException("No http response returned"); } - LOGGER.debug( - "Get response, StatusCode: {}, responseBody: {}", - response.getStatusCode(), + LOGGER.debug("Get response, StatusCode: {}, responseBody: {}", response.getStatusCode(), response.getBody()); - if (response.getStatusCode().value() == 200 || response.getStatusCode().value() == 201) { + if (response.getStatusCode().value() == 200 + || response.getStatusCode().value() == 201) { return response.getBody(); } else if (response.getStatusCode().value() == 404) { if (response.getBody().matches(SESSION_NOT_FOUND_PATTERN)) { throw new SessionNotFoundException(response.getBody()); } else { - throw new APINotFoundException( - "No rest api found for " + targetURL + ", " + response.getStatusCode()); + throw new APINotFoundException("No rest api found for " + targetURL + + ", " + response.getStatusCode()); } } else { String responseString = response.getBody(); if (responseString.contains("CreateInteractiveRequest[\\\"master\\\"]")) { return responseString; } - throw new LivyException( - String.format( - "Error with %s StatusCode: %s", response.getStatusCode().value(), responseString)); + throw new LivyException(String.format("Error with %s StatusCode: %s", + response.getStatusCode().value(), responseString)); } } @@ -778,23 +740,21 @@ public abstract class BaseLivyInterpreter extends Interpreter { try { callRestAPI("/sessions/" + sessionId, "DELETE"); } catch (Exception e) { - LOGGER.error( - String.format("Error closing session for user with session ID: %s", sessionId), e); + LOGGER.error(String.format("Error closing session for user with session ID: %s", + sessionId), e); } } /* - * We create these POJO here to accommodate livy 0.3 which is not released yet. livy rest api has - * some changes from version to version. So we create these POJO in zeppelin side to accommodate - * incompatibility between versions. Later, when livy become more stable, we could just depend on - * livy client jar. - */ + * We create these POJO here to accommodate livy 0.3 which is not released yet. livy rest api has + * some changes from version to version. So we create these POJO in zeppelin side to accommodate + * incompatibility between versions. Later, when livy become more stable, we could just depend on + * livy client jar. + */ private static class CreateSessionRequest { public final String kind; - @SerializedName("proxyUser") public final String user; - public final Map<String, String> conf; CreateSessionRequest(String kind, String user, Map<String, String> conf) { @@ -808,7 +768,9 @@ public abstract class BaseLivyInterpreter extends Interpreter { } } - /** */ + /** + * + */ public static class SessionInfo { public final int id; @@ -821,15 +783,8 @@ public abstract class BaseLivyInterpreter extends Interpreter { public final Map<String, String> appInfo; public final List<String> log; - public SessionInfo( - int id, - String appId, - String owner, - String proxyUser, - String state, - String kind, - Map<String, String> appInfo, - List<String> log) { + public SessionInfo(int id, String appId, String owner, String proxyUser, String state, + String kind, Map<String, String> appInfo, List<String> log) { this.id = id; this.appId = appId; this.owner = owner; @@ -859,7 +814,8 @@ public abstract class BaseLivyInterpreter extends Interpreter { public int size; public List<String> log; - SessionLog() {} + SessionLog() { + } public static SessionLog fromJson(String json) { return gson.fromJson(json, SessionLog.class); @@ -886,7 +842,8 @@ public abstract class BaseLivyInterpreter extends Interpreter { public double progress; public StatementOutput output; - StatementInfo() {} + StatementInfo() { + } public static StatementInfo fromJson(String json) { String rightJson = ""; @@ -931,13 +888,10 @@ public abstract class BaseLivyInterpreter extends Interpreter { private static class Data { @SerializedName("text/plain") public String plainText; - @SerializedName("image/png") public String imagePng; - @SerializedName("application/json") public String applicationJson; - @SerializedName("application/vnd.livy.table.v1+json") public TableMagic applicationLivyTableJson; }
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/livy/src/main/java/org/apache/zeppelin/livy/LivyException.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyException.java b/livy/src/main/java/org/apache/zeppelin/livy/LivyException.java index 4039004..c14351f 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivyException.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyException.java @@ -19,9 +19,12 @@ package org.apache.zeppelin.livy; import org.apache.zeppelin.interpreter.InterpreterException; -/** Livy api related exception. */ +/** + * Livy api related exception. + */ public class LivyException extends InterpreterException { - public LivyException() {} + public LivyException() { + } public LivyException(String message) { super(message); @@ -35,8 +38,8 @@ public class LivyException extends InterpreterException { super(cause); } - public LivyException( - String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + public LivyException(String message, Throwable cause, boolean enableSuppression, + boolean writableStackTrace) { super(message, cause, enableSuppression, writableStackTrace); } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/livy/src/main/java/org/apache/zeppelin/livy/LivyPySpark3Interpreter.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyPySpark3Interpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivyPySpark3Interpreter.java index d074e32..174c2c0 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivyPySpark3Interpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyPySpark3Interpreter.java @@ -19,7 +19,10 @@ package org.apache.zeppelin.livy; import java.util.Properties; -/** Livy PySpark interpreter for Zeppelin. */ + +/** + * Livy PySpark interpreter for Zeppelin. + */ public class LivyPySpark3Interpreter extends LivyPySparkBaseInterpreter { public LivyPySpark3Interpreter(Properties property) { @@ -30,4 +33,5 @@ public class LivyPySpark3Interpreter extends LivyPySparkBaseInterpreter { public String getSessionKind() { return "pyspark3"; } + } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkBaseInterpreter.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkBaseInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkBaseInterpreter.java index c5729d1..32399c6 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkBaseInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkBaseInterpreter.java @@ -19,7 +19,9 @@ package org.apache.zeppelin.livy; import java.util.Properties; -/** Base class for PySpark Interpreter. */ +/** + * Base class for PySpark Interpreter. + */ public abstract class LivyPySparkBaseInterpreter extends BaseLivyInterpreter { public LivyPySparkBaseInterpreter(Properties property) { @@ -29,21 +31,22 @@ public abstract class LivyPySparkBaseInterpreter extends BaseLivyInterpreter { @Override protected String extractAppId() throws LivyException { return extractStatementResult( - interpret("sc.applicationId", null, false, false, false).message().get(0).getData()); + interpret("sc.applicationId", null, false, false, false).message() + .get(0).getData()); } @Override protected String extractWebUIAddress() throws LivyException { return extractStatementResult( - interpret("sc._jsc.sc().ui().get().appUIAddress()", null, false, false, false) - .message() - .get(0) - .getData()); + interpret( + "sc._jsc.sc().ui().get().appUIAddress()", null, false, false, false) + .message().get(0).getData()); } /** - * Extract the eval result of spark shell, e.g. extract application_1473129941656_0048 from - * following: u'application_1473129941656_0048' + * Extract the eval result of spark shell, e.g. extract application_1473129941656_0048 + * from following: + * u'application_1473129941656_0048' * * @param result * @return @@ -53,8 +56,8 @@ public abstract class LivyPySparkBaseInterpreter extends BaseLivyInterpreter { if ((pos = result.indexOf("'")) >= 0) { return result.substring(pos + 1, result.length() - 1).trim(); } else { - throw new RuntimeException( - "No result can be extracted from '" + result + "', " + "something must be wrong"); + throw new RuntimeException("No result can be extracted from '" + result + "', " + + "something must be wrong"); } } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkInterpreter.java index 9b15274..d664bbe 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkInterpreter.java @@ -19,7 +19,10 @@ package org.apache.zeppelin.livy; import java.util.Properties; -/** Livy PySpark interpreter for Zeppelin. */ + +/** + * Livy PySpark interpreter for Zeppelin. + */ public class LivyPySparkInterpreter extends LivyPySparkBaseInterpreter { public LivyPySparkInterpreter(Properties property) { @@ -30,4 +33,6 @@ public class LivyPySparkInterpreter extends LivyPySparkBaseInterpreter { public String getSessionKind() { return "pyspark"; } + + } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/livy/src/main/java/org/apache/zeppelin/livy/LivySharedInterpreter.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySharedInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySharedInterpreter.java index df0fd07..c912dc9 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivySharedInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySharedInterpreter.java @@ -17,16 +17,20 @@ package org.apache.zeppelin.livy; -import java.util.Properties; import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -/** Livy Interpreter for shared kind which share SparkContext across spark/pyspark/r. */ +/** + * Livy Interpreter for shared kind which share SparkContext across spark/pyspark/r. + */ public class LivySharedInterpreter extends BaseLivyInterpreter { private static final Logger LOGGER = LoggerFactory.getLogger(LivySharedInterpreter.class); @@ -59,8 +63,8 @@ public class LivySharedInterpreter extends BaseLivyInterpreter { isSupported = false; } } catch (LivyException e) { - String msg = - "Fail to create session, please check livy interpreter log and " + "livy server log"; + String msg = "Fail to create session, please check livy interpreter log and " + + "livy server log"; throw new InterpreterException(msg, e); } } @@ -78,8 +82,8 @@ public class LivySharedInterpreter extends BaseLivyInterpreter { return interpret(st, codeType, context.getParagraphId(), this.displayAppInfo, true, true); } catch (LivyException e) { LOGGER.error("Fail to interpret:" + st, e); - return new InterpreterResult( - InterpreterResult.Code.ERROR, InterpreterUtils.getMostRelevantMessage(e)); + return new InterpreterResult(InterpreterResult.Code.ERROR, + InterpreterUtils.getMostRelevantMessage(e)); } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java index e7b3570..ad62e9b 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java @@ -19,7 +19,9 @@ package org.apache.zeppelin.livy; import java.util.Properties; -/** Livy Spark interpreter for Zeppelin. */ +/** + * Livy Spark interpreter for Zeppelin. + */ public class LivySparkInterpreter extends BaseLivyInterpreter { public LivySparkInterpreter(Properties property) { @@ -34,7 +36,8 @@ public class LivySparkInterpreter extends BaseLivyInterpreter { @Override protected String extractAppId() throws LivyException { return extractStatementResult( - interpret("sc.applicationId", null, false, false, false).message().get(0).getData()); + interpret("sc.applicationId", null, false, false, false).message() + .get(0).getData()); } @Override @@ -42,25 +45,17 @@ public class LivySparkInterpreter extends BaseLivyInterpreter { interpret( "val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get", null, - null, - false, - false, - false); + null, false, false, false); return extractStatementResult( interpret( - "webui.getClass.getMethod(\"appUIAddress\").invoke(webui)", - null, - false, - false, - false) - .message() - .get(0) - .getData()); + "webui.getClass.getMethod(\"appUIAddress\").invoke(webui)", null, false, false, false) + .message().get(0).getData()); } /** - * Extract the eval result of spark shell, e.g. extract application_1473129941656_0048 from - * following: res0: String = application_1473129941656_0048 + * Extract the eval result of spark shell, e.g. extract application_1473129941656_0048 + * from following: + * res0: String = application_1473129941656_0048 * * @param result * @return @@ -70,8 +65,8 @@ public class LivySparkInterpreter extends BaseLivyInterpreter { if ((pos = result.indexOf("=")) >= 0) { return result.substring(pos + 1).trim(); } else { - throw new RuntimeException( - "No result can be extracted from '" + result + "', " + "something must be wrong"); + throw new RuntimeException("No result can be extracted from '" + result + "', " + + "something must be wrong"); } } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/livy/src/main/java/org/apache/zeppelin/livy/LivySparkRInterpreter.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkRInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkRInterpreter.java index 6aab4f0..c270437 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkRInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkRInterpreter.java @@ -19,7 +19,10 @@ package org.apache.zeppelin.livy; import java.util.Properties; -/** Livy PySpark interpreter for Zeppelin. */ + +/** + * Livy PySpark interpreter for Zeppelin. + */ public class LivySparkRInterpreter extends BaseLivyInterpreter { public LivySparkRInterpreter(Properties property) { @@ -33,13 +36,13 @@ public class LivySparkRInterpreter extends BaseLivyInterpreter { @Override protected String extractAppId() throws LivyException { - // TODO(zjffdu) depends on SparkR + //TODO(zjffdu) depends on SparkR return null; } @Override protected String extractWebUIAddress() throws LivyException { - // TODO(zjffdu) depends on SparkR + //TODO(zjffdu) depends on SparkR return null; } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java index 12c641a..b2e18f3 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java @@ -17,11 +17,6 @@ package org.apache.zeppelin.livy; -import static org.apache.commons.lang.StringEscapeUtils.escapeJavaScript; - -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; import org.apache.commons.lang.StringUtils; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; @@ -33,7 +28,15 @@ import org.apache.zeppelin.interpreter.ResultMessages; import org.apache.zeppelin.scheduler.Scheduler; import org.apache.zeppelin.scheduler.SchedulerFactory; -/** Livy SparkSQL Interpreter for Zeppelin. */ +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import static org.apache.commons.lang.StringEscapeUtils.escapeJavaScript; + +/** + * Livy SparkSQL Interpreter for Zeppelin. + */ public class LivySparkSQLInterpreter extends BaseLivyInterpreter { public static final String ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE = "zeppelin.livy.spark.sql.field.truncate"; @@ -68,13 +71,13 @@ public class LivySparkSQLInterpreter extends BaseLivyInterpreter { // As we don't know whether livyserver use spark2 or spark1, so we will detect SparkSession // to judge whether it is using spark2. try { - InterpreterContext context = - InterpreterContext.builder().setInterpreterOut(new InterpreterOutput(null)).build(); + InterpreterContext context = InterpreterContext.builder() + .setInterpreterOut(new InterpreterOutput(null)) + .build(); InterpreterResult result = sparkInterpreter.interpret("spark", context); - if (result.code() == InterpreterResult.Code.SUCCESS - && result.message().get(0).getData().contains("org.apache.spark.sql.SparkSession")) { - LOGGER.info( - "SparkSession is detected so we are using spark 2.x for session {}", + if (result.code() == InterpreterResult.Code.SUCCESS && + result.message().get(0).getData().contains("org.apache.spark.sql.SparkSession")) { + LOGGER.info("SparkSession is detected so we are using spark 2.x for session {}", sparkInterpreter.getSessionInfo().id); isSpark2 = true; } else { @@ -86,14 +89,12 @@ public class LivySparkSQLInterpreter extends BaseLivyInterpreter { // create SqlContext if it is not available, as in livy 0.2 sqlContext // is not available. LOGGER.info("sqlContext is not detected, try to create SQLContext by ourselves"); - result = - sparkInterpreter.interpret( - "val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n" - + "import sqlContext.implicits._", - context); + result = sparkInterpreter.interpret( + "val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n" + + "import sqlContext.implicits._", context); if (result.code() == InterpreterResult.Code.ERROR) { - throw new LivyException( - "Fail to create SQLContext," + result.message().get(0).getData()); + throw new LivyException("Fail to create SQLContext," + + result.message().get(0).getData()); } } } @@ -112,10 +113,11 @@ public class LivySparkSQLInterpreter extends BaseLivyInterpreter { // use triple quote so that we don't need to do string escape. String sqlQuery = null; if (isSpark2) { - sqlQuery = "spark.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ", " + truncate + ")"; + sqlQuery = "spark.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ", " + + truncate + ")"; } else { - sqlQuery = - "sqlContext.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ", " + truncate + ")"; + sqlQuery = "sqlContext.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ", " + + truncate + ")"; } InterpreterResult result = sparkInterpreter.interpret(sqlQuery, context); if (result.code() == InterpreterResult.Code.SUCCESS) { @@ -128,9 +130,8 @@ public class LivySparkSQLInterpreter extends BaseLivyInterpreter { List<String> rows = parseSQLOutput(message.getData()); result2.add(InterpreterResult.Type.TABLE, StringUtils.join(rows, "\n")); if (rows.size() >= (maxResult + 1)) { - result2.add( - ResultMessages.getExceedsLimitRowsMessage( - maxResult, ZEPPELIN_LIVY_SPARK_SQL_MAX_RESULT)); + result2.add(ResultMessages.getExceedsLimitRowsMessage(maxResult, + ZEPPELIN_LIVY_SPARK_SQL_MAX_RESULT)); } } else { result2.add(message.getType(), message.getData()); @@ -142,8 +143,8 @@ public class LivySparkSQLInterpreter extends BaseLivyInterpreter { } } catch (Exception e) { LOGGER.error("Exception in LivySparkSQLInterpreter while interpret ", e); - return new InterpreterResult( - InterpreterResult.Code.ERROR, InterpreterUtils.getMostRelevantMessage(e)); + return new InterpreterResult(InterpreterResult.Code.ERROR, + InterpreterUtils.getMostRelevantMessage(e)); } } @@ -201,7 +202,9 @@ public class LivySparkSQLInterpreter extends BaseLivyInterpreter { return rows; } - /** Represent the start and end index of each cell. */ + /** + * Represent the start and end index of each cell. + */ private static class Pair { private int start; private int end; @@ -220,9 +223,8 @@ public class LivySparkSQLInterpreter extends BaseLivyInterpreter { public Scheduler getScheduler() { if (concurrentSQL()) { int maxConcurrency = 10; - return SchedulerFactory.singleton() - .createOrGetParallelScheduler( - LivySparkInterpreter.class.getName() + this.hashCode(), maxConcurrency); + return SchedulerFactory.singleton().createOrGetParallelScheduler( + LivySparkInterpreter.class.getName() + this.hashCode(), maxConcurrency); } else { if (sparkInterpreter != null) { return sparkInterpreter.getScheduler(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java b/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java index 1d56e83..55ebd57 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java @@ -20,7 +20,9 @@ package org.apache.zeppelin.livy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Provide reading comparing capability of livy version. */ +/** + * Provide reading comparing capability of livy version. + */ public class LivyVersion { private static final Logger logger = LoggerFactory.getLogger(LivyVersion.class); @@ -50,8 +52,8 @@ public class LivyVersion { // version is always 5 digits. (e.g. 2.0.0 -> 20000, 1.6.2 -> 10602) version = Integer.parseInt(String.format("%d%02d%02d", major, minor, patch)); } catch (Exception e) { - logger.error( - "Can not recognize Livy version " + versionString + ". Assume it's a future release", e); + logger.error("Can not recognize Livy version " + versionString + + ". Assume it's a future release", e); // assume it is future release version = 99999; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/livy/src/main/java/org/apache/zeppelin/livy/SessionDeadException.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/SessionDeadException.java b/livy/src/main/java/org/apache/zeppelin/livy/SessionDeadException.java index dfbb56e..5811790 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/SessionDeadException.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/SessionDeadException.java @@ -17,4 +17,6 @@ package org.apache.zeppelin.livy; -public class SessionDeadException extends LivyException {} +public class SessionDeadException extends LivyException { + +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/livy/src/main/java/org/apache/zeppelin/livy/SessionNotFoundException.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/SessionNotFoundException.java b/livy/src/main/java/org/apache/zeppelin/livy/SessionNotFoundException.java index 3f56116..4547057 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/SessionNotFoundException.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/SessionNotFoundException.java @@ -17,7 +17,9 @@ package org.apache.zeppelin.livy; -/** */ +/** + * + */ public class SessionNotFoundException extends LivyException { public SessionNotFoundException(String message) {
