Repository: zeppelin Updated Branches: refs/heads/master e3315b2d2 -> 9d40013a9
ZEPPELIN-2261. Support to connect with livy through https ### What is this PR for? Livy server support https, but the currently livy interpreter doesn't support it. This PR is for for the supporting to connect with livy through https ### What type of PR is it? [Improvement] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-2261 ### How should this be tested? Tested manually on livy server with ssl enabled. ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #2139 from zjffdu/ZEPPELIN-2261 and squashes the following commits: 52fc204 [Jeff Zhang] address comment 53230c3 [Jeff Zhang] [ZEPPELIN-2261]. Support to connect with livy through https Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/9d40013a Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/9d40013a Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/9d40013a Branch: refs/heads/master Commit: 9d40013a994ca65f99831a416fb235d15ee17fdb Parents: e3315b2 Author: Jeff Zhang <zjf...@apache.org> Authored: Mon Mar 20 16:11:38 2017 +0800 Committer: Felix Cheung <felixche...@apache.org> Committed: Wed Mar 22 23:57:55 2017 -0700 ---------------------------------------------------------------------- docs/interpreter/livy.md | 10 +++ .../zeppelin/livy/BaseLivyInterprereter.java | 83 ++++++++++++++++---- .../zeppelin/livy/LivySQLInterpreterTest.java | 1 + 3 files changed, 79 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9d40013a/docs/interpreter/livy.md ---------------------------------------------------------------------- diff --git a/docs/interpreter/livy.md b/docs/interpreter/livy.md index 6f04244..ce1c34a 100644 --- a/docs/interpreter/livy.md +++ b/docs/interpreter/livy.md @@ -130,6 +130,16 @@ Example: `spark.driver.memory` to `livy.spark.driver.memory` <td></td> <td>Adding extra libraries to livy interpreter</td> </tr> + <tr> + <td>zeppelin.livy.ssl.trustStore</td> + <td></td> + <td>client trustStore file. Used when livy ssl is enabled</td> + </tr> + <tr> + <td>zeppelin.livy.ssl.trustStorePassword</td> + <td></td> + <td>password for trustStore file. Used when livy ssl is enabled</td> + </tr> </table> **We remove livy.spark.master in zeppelin-0.7. Because we sugguest user to use livy 0.3 in zeppelin-0.7. And livy 0.3 don't allow to specify livy.spark.master, it enfornce yarn-cluster mode.** http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9d40013a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java index c580901..d29d20c 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java @@ -21,6 +21,10 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.annotations.SerializedName; import org.apache.commons.lang.StringUtils; +import org.apache.http.client.HttpClient; +import org.apache.http.conn.ssl.SSLConnectionSocketFactory; +import org.apache.http.conn.ssl.SSLContexts; +import org.apache.http.impl.client.HttpClients; import org.apache.zeppelin.interpreter.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,11 +32,16 @@ import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.ResponseEntity; +import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; import org.springframework.security.kerberos.client.KerberosRestTemplate; import org.springframework.web.client.HttpClientErrorException; import org.springframework.web.client.RestClientException; import org.springframework.web.client.RestTemplate; +import javax.net.ssl.SSLContext; +import java.io.FileInputStream; +import java.io.IOException; +import java.security.KeyStore; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -56,6 +65,7 @@ public abstract class BaseLivyInterprereter extends Interpreter { protected boolean displayAppInfo; private AtomicBoolean sessionExpired = new AtomicBoolean(false); protected LivyVersion livyVersion; + private RestTemplate restTemplate; // keep tracking the mapping between paragraphId and statementId, so that we can cancel the // statement after we execute it. @@ -72,6 +82,7 @@ public abstract class BaseLivyInterprereter extends Interpreter { property.getProperty("zeppelin.livy.session.create_timeout", 120 + "")); this.pullStatusInterval = Integer.parseInt( property.getProperty("zeppelin.livy.pull_status.interval.millis", 1000 + "")); + this.restTemplate = createRestTemplate(); } public abstract String getSessionKind(); @@ -148,7 +159,7 @@ public abstract class BaseLivyInterprereter extends Interpreter { InterpreterUtils.getMostRelevantMessage(e)); } } - + @Override public void cancel(InterpreterContext context) { if (livyVersion.isCancelSupported()) { @@ -321,12 +332,12 @@ public abstract class BaseLivyInterprereter extends Interpreter { } else { //TODO(zjffdu) support other types of data (like json, image and etc) String result = stmtInfo.output.data.plain_text; - + // check table magic result first if (stmtInfo.output.data.application_livy_table_json != null) { StringBuilder outputBuilder = new StringBuilder(); boolean notFirstColumn = false; - + for (Map header : stmtInfo.output.data.application_livy_table_json.headers) { if (notFirstColumn) { outputBuilder.append("\t"); @@ -334,17 +345,17 @@ public abstract class BaseLivyInterprereter extends Interpreter { outputBuilder.append(header.get("name")); notFirstColumn = true; } - + outputBuilder.append("\n"); for (List<Object> row : stmtInfo.output.data.application_livy_table_json.records) { outputBuilder.append(StringUtils.join(row, "\t")); - outputBuilder.append("\n"); - } + outputBuilder.append("\n"); + } return new InterpreterResult(InterpreterResult.Code.SUCCESS, - InterpreterResult.Type.TABLE, outputBuilder.toString()); - } else if (stmtInfo.output.data.image_png != null) { + InterpreterResult.Type.TABLE, outputBuilder.toString()); + } else if (stmtInfo.output.data.image_png != null) { return new InterpreterResult(InterpreterResult.Code.SUCCESS, - InterpreterResult.Type.IMG, (String) stmtInfo.output.data.image_png); + InterpreterResult.Type.IMG, (String) stmtInfo.output.data.image_png); } else if (result != null) { result = result.trim(); if (result.startsWith("<link") @@ -387,13 +398,56 @@ public abstract class BaseLivyInterprereter extends Interpreter { callRestAPI("/sessions/" + sessionInfo.id + "/statements/" + statementId + "/cancel", "POST"); } - private RestTemplate getRestTemplate() { + + private RestTemplate createRestTemplate() { + HttpClient httpClient = null; + if (livyURL.startsWith("https:")) { + String keystoreFile = property.getProperty("zeppelin.livy.ssl.trustStore"); + String password = property.getProperty("zeppelin.livy.ssl.trustStorePassword"); + if (StringUtils.isBlank(keystoreFile)) { + throw new RuntimeException("No zeppelin.livy.ssl.trustStore specified for livy ssl"); + } + if (StringUtils.isBlank(password)) { + throw new RuntimeException("No zeppelin.livy.ssl.trustStorePassword specified " + + "for livy ssl"); + } + FileInputStream inputStream = null; + try { + inputStream = new FileInputStream(keystoreFile); + KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); + trustStore.load(new FileInputStream(keystoreFile), password.toCharArray()); + SSLContext sslContext = SSLContexts.custom() + .loadTrustMaterial(trustStore) + .build(); + SSLConnectionSocketFactory csf = new SSLConnectionSocketFactory(sslContext); + httpClient = HttpClients.custom().setSSLSocketFactory(csf).build(); + } catch (Exception e) { + throw new RuntimeException("Failed to create SSL HttpClient", e); + } finally { + if (inputStream != null) { + try { + inputStream.close(); + } catch (IOException e) { + LOGGER.error("Failed to close keystore file", e); + } + } + } + } + String keytabLocation = property.getProperty("zeppelin.livy.keytab"); String principal = property.getProperty("zeppelin.livy.principal"); if (StringUtils.isNotEmpty(keytabLocation) && StringUtils.isNotEmpty(principal)) { - return new KerberosRestTemplate(keytabLocation, principal); + if (httpClient == null) { + return new KerberosRestTemplate(keytabLocation, principal); + } else { + return new KerberosRestTemplate(keytabLocation, principal, httpClient); + } + } + if (httpClient == null) { + return new RestTemplate(); + } else { + return new RestTemplate(new HttpComponentsClientHttpRequestFactory(httpClient)); } - return new RestTemplate(); } private String callRestAPI(String targetURL, String method) throws LivyException { @@ -404,7 +458,6 @@ public abstract class BaseLivyInterprereter extends Interpreter { throws LivyException { targetURL = livyURL + targetURL; LOGGER.debug("Call rest api in {}, method: {}, jsonData: {}", targetURL, method, jsonData); - RestTemplate restTemplate = getRestTemplate(); HttpHeaders headers = new HttpHeaders(); headers.add("Content-Type", "application/json"); headers.add("X-Requested-By", "zeppelin"); @@ -591,11 +644,11 @@ public abstract class BaseLivyInterprereter extends Interpreter { @SerializedName("application/vnd.livy.table.v1+json") public TableMagic application_livy_table_json; } - + private static class TableMagic { @SerializedName("headers") List<Map> headers; - + @SerializedName("data") List<List> records; } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9d40013a/livy/src/test/java/org/apache/zeppelin/livy/LivySQLInterpreterTest.java ---------------------------------------------------------------------- diff --git a/livy/src/test/java/org/apache/zeppelin/livy/LivySQLInterpreterTest.java b/livy/src/test/java/org/apache/zeppelin/livy/LivySQLInterpreterTest.java index 9065f35..fdef9b1 100644 --- a/livy/src/test/java/org/apache/zeppelin/livy/LivySQLInterpreterTest.java +++ b/livy/src/test/java/org/apache/zeppelin/livy/LivySQLInterpreterTest.java @@ -36,6 +36,7 @@ public class LivySQLInterpreterTest { @Before public void setUp() { Properties properties = new Properties(); + properties.setProperty("zeppelin.livy.url", "http://localhost:8998"); properties.setProperty("zeppelin.livy.session.create_timeout", "120"); properties.setProperty("zeppelin.livy.spark.sql.maxResult", "3"); sqlInterpreter = new LivySparkSQLInterpreter(properties);