This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 735064f ZEPPELIN-3976. Create AbstractInterprter for common usage 735064f is described below commit 735064fdc57ae958fabae85b399bb5af3cb79144 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Tue Jan 29 15:13:02 2019 +0800 ZEPPELIN-3976. Create AbstractInterprter for common usage ### What is this PR for? This ticket create AbstractInterpreter which do some common things: * Support ZeppelinContext * Support z variable replacement ### What type of PR is it? [Refactoring] ### Todos * [ ] - Task ### What is the Jira issue? * https://jira.apache.org/jira/browse/ZEPPELIN-3976 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3293 from zjffdu/ZEPPELIN-3976 and squashes the following commits: 7b42a42f3 [Jeff Zhang] ZEPPELIN-3976. Create AbstractInterprter for common usage --- .../zeppelin/groovy/GroovyZeppelinContext.java | 2 +- .../apache/zeppelin/helium/DevZeppelinContext.java | 2 +- .../org/apache/zeppelin/jdbc/JDBCInterpreter.java | 15 ++++-- .../jdbc/JDBCInterpreterInterpolationTest.java | 9 ++-- .../apache/zeppelin/jdbc/JDBCInterpreterTest.java | 35 ++++++++------ .../zeppelin/python/PythonZeppelinContext.java | 2 +- .../apache/zeppelin/sap/UniverseInterpreter.java | 18 +++++-- .../apache/zeppelin/shell/ShellInterpreter.java | 19 ++++++-- .../zeppelin/shell/ShellInterpreterTest.java | 7 +-- .../zeppelin/spark/AbstractSparkInterpreter.java | 5 +- .../apache/zeppelin/spark/NewSparkInterpreter.java | 7 +-- .../apache/zeppelin/spark/OldSparkInterpreter.java | 3 +- .../apache/zeppelin/spark/PySparkInterpreter.java | 2 +- .../apache/zeppelin/spark/SparkInterpreter.java | 6 +-- .../apache/zeppelin/spark/SparkRInterpreter.java | 2 +- .../apache/zeppelin/spark/SparkSqlInterpreter.java | 20 ++++++-- .../zeppelin/interpreter/AbstractInterpreter.java | 55 ++++++++++++++++++++++ .../zeppelin/interpreter/BaseZeppelinContext.java | 4 +- .../zeppelin/interpreter/KerberosInterpreter.java | 4 +- .../interpreter/BaseZeppelinContextTest.java | 2 +- 20 files changed, 157 insertions(+), 62 deletions(-) diff --git a/groovy/src/main/java/org/apache/zeppelin/groovy/GroovyZeppelinContext.java b/groovy/src/main/java/org/apache/zeppelin/groovy/GroovyZeppelinContext.java index 3d17462..3f0d600 100644 --- a/groovy/src/main/java/org/apache/zeppelin/groovy/GroovyZeppelinContext.java +++ b/groovy/src/main/java/org/apache/zeppelin/groovy/GroovyZeppelinContext.java @@ -43,7 +43,7 @@ public class GroovyZeppelinContext extends BaseZeppelinContext { } @Override - protected String showData(Object obj) { + public String showData(Object obj) { return null; } } diff --git a/helium-dev/src/main/java/org/apache/zeppelin/helium/DevZeppelinContext.java b/helium-dev/src/main/java/org/apache/zeppelin/helium/DevZeppelinContext.java index 75d193c..45d8b39 100644 --- a/helium-dev/src/main/java/org/apache/zeppelin/helium/DevZeppelinContext.java +++ b/helium-dev/src/main/java/org/apache/zeppelin/helium/DevZeppelinContext.java @@ -43,7 +43,7 @@ public class DevZeppelinContext extends BaseZeppelinContext { } @Override - protected String showData(Object obj) { + public String showData(Object obj) { return null; } } diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index dd361f5..a547171 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.alias.CredentialProvider; import org.apache.hadoop.security.alias.CredentialProviderFactory; +import org.apache.zeppelin.interpreter.BaseZeppelinContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -154,6 +155,11 @@ public class JDBCInterpreter extends KerberosInterpreter { } @Override + public BaseZeppelinContext getZeppelinContext() { + return null; + } + + @Override protected boolean runKerberosLogin() { try { if (UserGroupInformation.isLoginKeytabBased()) { @@ -803,9 +809,12 @@ public class JDBCInterpreter extends KerberosInterpreter { } @Override - public InterpreterResult interpret(String originalCmd, InterpreterContext contextInterpreter) { - String cmd = Boolean.parseBoolean(getProperty("zeppelin.jdbc.interpolation")) ? - interpolate(originalCmd, contextInterpreter.getResourcePool()) : originalCmd; + protected boolean isInterpolate() { + return Boolean.parseBoolean(getProperty("zeppelin.jdbc.interpolation", "false")); + } + + @Override + public InterpreterResult internalInterpret(String cmd, InterpreterContext contextInterpreter) { logger.debug("Run SQL command '{}'", cmd); String propertyKey = getPropertyKey(contextInterpreter); cmd = cmd.trim(); diff --git a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterInterpolationTest.java b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterInterpolationTest.java index d55f9fe..1ff246b 100644 --- a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterInterpolationTest.java +++ b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterInterpolationTest.java @@ -16,6 +16,7 @@ package org.apache.zeppelin.jdbc; import com.mockrunner.jdbc.BasicJDBCTestCaseAdapter; import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.resource.LocalResourcePool; import org.apache.zeppelin.resource.ResourcePool; @@ -77,7 +78,7 @@ public class JDBCInterpreterInterpolationTest extends BasicJDBCTestCaseAdapter { } @Test - public void testEnableDisableProperty() throws IOException { + public void testEnableDisableProperty() throws IOException, InterpreterException { Properties properties = new Properties(); properties.setProperty("common.max_count", "1000"); properties.setProperty("common.max_retry", "3"); @@ -115,7 +116,7 @@ public class JDBCInterpreterInterpolationTest extends BasicJDBCTestCaseAdapter { } @Test - public void testNormalQueryInterpolation() throws IOException { + public void testNormalQueryInterpolation() throws IOException, InterpreterException { Properties properties = new Properties(); properties.setProperty("common.max_count", "1000"); properties.setProperty("common.max_retry", "3"); @@ -154,7 +155,7 @@ public class JDBCInterpreterInterpolationTest extends BasicJDBCTestCaseAdapter { } @Test - public void testEscapedInterpolationPattern() throws IOException { + public void testEscapedInterpolationPattern() throws IOException, InterpreterException { Properties properties = new Properties(); properties.setProperty("common.max_count", "1000"); properties.setProperty("common.max_retry", "3"); @@ -177,7 +178,7 @@ public class JDBCInterpreterInterpolationTest extends BasicJDBCTestCaseAdapter { assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code()); assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType()); assertEquals(1, interpreterResult.message().size()); - assertEquals("ID\tNAME\nkey\tkeyboard\nmou\tmouse\n", + assertEquals("ID\tNAME\nkey\tkeyboard\nmou\tmouse\n", interpreterResult.message().get(0).getData()); } diff --git a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java index c340768..995c530 100644 --- a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java +++ b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java @@ -133,7 +133,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter { } @Test - public void testForMapPrefix() throws SQLException, IOException { + public void testForMapPrefix() throws SQLException, IOException, InterpreterException { Properties properties = new Properties(); properties.setProperty("common.max_count", "1000"); properties.setProperty("common.max_retry", "3"); @@ -170,7 +170,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter { } @Test - public void testSelectQuery() throws SQLException, IOException { + public void testSelectQuery() throws SQLException, IOException, InterpreterException { Properties properties = new Properties(); properties.setProperty("common.max_count", "1000"); properties.setProperty("common.max_retry", "3"); @@ -191,7 +191,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter { } @Test - public void testColumnAliasQuery() throws IOException { + public void testColumnAliasQuery() throws IOException, InterpreterException { Properties properties = new Properties(); properties.setProperty("common.max_count", "1000"); properties.setProperty("common.max_retry", "3"); @@ -243,7 +243,8 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter { } @Test - public void testQueryWithEscapedCharacters() throws SQLException, IOException { + public void testQueryWithEscapedCharacters() throws SQLException, IOException, + InterpreterException { String sqlQuery = "select '\\n', ';';" + "select replace('A\\;B', '\\', 'text');" + "select '\\', ';';" + @@ -274,7 +275,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter { } @Test - public void testSelectMultipleQueries() throws SQLException, IOException { + public void testSelectMultipleQueries() throws SQLException, IOException, InterpreterException { Properties properties = new Properties(); properties.setProperty("common.max_count", "1000"); properties.setProperty("common.max_retry", "3"); @@ -301,7 +302,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter { } @Test - public void testDefaultSplitQuries() throws SQLException, IOException { + public void testDefaultSplitQuries() throws SQLException, IOException, InterpreterException { Properties properties = new Properties(); properties.setProperty("common.max_count", "1000"); properties.setProperty("common.max_retry", "3"); @@ -324,7 +325,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter { } @Test - public void testSelectQueryWithNull() throws SQLException, IOException { + public void testSelectQueryWithNull() throws SQLException, IOException, InterpreterException { Properties properties = new Properties(); properties.setProperty("common.max_count", "1000"); properties.setProperty("common.max_retry", "3"); @@ -346,7 +347,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter { @Test - public void testSelectQueryMaxResult() throws SQLException, IOException { + public void testSelectQueryMaxResult() throws SQLException, IOException, InterpreterException { Properties properties = new Properties(); properties.setProperty("common.max_count", "1"); properties.setProperty("common.max_retry", "3"); @@ -444,7 +445,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter { } @Test - public void testMultiTenant() throws SQLException, IOException { + public void testMultiTenant() throws SQLException, IOException, InterpreterException { /* * assume that the database user is 'dbuser' and password is 'dbpassword' * 'jdbc1' interpreter has user('dbuser')/password('dbpassword') property @@ -513,7 +514,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter { } @Test - public void testPrecode() throws SQLException, IOException { + public void testPrecode() throws SQLException, IOException, InterpreterException { Properties properties = new Properties(); properties.setProperty("default.driver", "org.h2.Driver"); properties.setProperty("default.url", getJdbcConnection()); @@ -556,7 +557,8 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter { } @Test - public void testPrecodeWithAnotherPrefix() throws SQLException, IOException { + public void testPrecodeWithAnotherPrefix() throws SQLException, IOException, + InterpreterException { Properties properties = new Properties(); properties.setProperty("anotherPrefix.driver", "org.h2.Driver"); properties.setProperty("anotherPrefix.url", getJdbcConnection()); @@ -585,7 +587,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter { } @Test - public void testStatementPrecode() throws SQLException, IOException { + public void testStatementPrecode() throws SQLException, IOException, InterpreterException { Properties properties = new Properties(); properties.setProperty("default.driver", "org.h2.Driver"); properties.setProperty("default.url", getJdbcConnection()); @@ -605,7 +607,8 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter { } @Test - public void testIncorrectStatementPrecode() throws SQLException, IOException { + public void testIncorrectStatementPrecode() throws SQLException, IOException, + InterpreterException { Properties properties = new Properties(); properties.setProperty("default.driver", "org.h2.Driver"); properties.setProperty("default.url", getJdbcConnection()); @@ -624,7 +627,8 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter { } @Test - public void testStatementPrecodeWithAnotherPrefix() throws SQLException, IOException { + public void testStatementPrecodeWithAnotherPrefix() throws SQLException, IOException, + InterpreterException { Properties properties = new Properties(); properties.setProperty("anotherPrefix.driver", "org.h2.Driver"); properties.setProperty("anotherPrefix.url", getJdbcConnection()); @@ -652,7 +656,8 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter { } @Test - public void testSplitSqlQueryWithComments() throws SQLException, IOException { + public void testSplitSqlQueryWithComments() throws SQLException, IOException, + InterpreterException { Properties properties = new Properties(); properties.setProperty("common.max_count", "1000"); properties.setProperty("common.max_retry", "3"); diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonZeppelinContext.java b/python/src/main/java/org/apache/zeppelin/python/PythonZeppelinContext.java index 526784e..855ddf7 100644 --- a/python/src/main/java/org/apache/zeppelin/python/PythonZeppelinContext.java +++ b/python/src/main/java/org/apache/zeppelin/python/PythonZeppelinContext.java @@ -43,7 +43,7 @@ public class PythonZeppelinContext extends BaseZeppelinContext { } @Override - protected String showData(Object obj) { + public String showData(Object obj) { return null; } } diff --git a/sap/src/main/java/org/apache/zeppelin/sap/UniverseInterpreter.java b/sap/src/main/java/org/apache/zeppelin/sap/UniverseInterpreter.java index 17da1c9..3bea686 100644 --- a/sap/src/main/java/org/apache/zeppelin/sap/UniverseInterpreter.java +++ b/sap/src/main/java/org/apache/zeppelin/sap/UniverseInterpreter.java @@ -18,6 +18,8 @@ package org.apache.zeppelin.sap; import org.apache.commons.lang.StringUtils; +import org.apache.zeppelin.interpreter.AbstractInterpreter; +import org.apache.zeppelin.interpreter.BaseZeppelinContext; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; @@ -36,7 +38,7 @@ import java.util.concurrent.TimeUnit; /** * SAP Universe interpreter for Zeppelin. */ -public class UniverseInterpreter extends Interpreter { +public class UniverseInterpreter extends AbstractInterpreter { public UniverseInterpreter(Properties properties) { super(properties); @@ -80,10 +82,18 @@ public class UniverseInterpreter extends Interpreter { } @Override - public InterpreterResult interpret(String originalSt, InterpreterContext context) + protected boolean isInterpolate() { + return Boolean.parseBoolean(getProperty("universe.interpolation", "false")); + } + + @Override + public BaseZeppelinContext getZeppelinContext() { + return null; + } + + @Override + public InterpreterResult internalInterpret(String st, InterpreterContext context) throws InterpreterException { - final String st = Boolean.parseBoolean(getProperty("universe.interpolation", "false")) ? - interpolate(originalSt, context.getResourcePool()) : originalSt; try { InterpreterResult interpreterResult = new InterpreterResult(InterpreterResult.Code.SUCCESS); String paragraphId = context.getParagraphId(); diff --git a/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java b/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java index c686896..8bdbcaa 100644 --- a/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java +++ b/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java @@ -23,6 +23,7 @@ import org.apache.commons.exec.ExecuteException; import org.apache.commons.exec.ExecuteWatchdog; import org.apache.commons.exec.PumpStreamHandler; import org.apache.commons.lang3.StringUtils; +import org.apache.zeppelin.interpreter.BaseZeppelinContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,14 +84,22 @@ public class ShellInterpreter extends KerberosInterpreter { } } + @Override + protected boolean isInterpolate() { + return Boolean.parseBoolean(getProperty("zeppelin.shell.interpolation", "false")); + } @Override - public InterpreterResult interpret(String originalCmd, InterpreterContext contextInterpreter) { - String cmd = Boolean.parseBoolean(getProperty("zeppelin.shell.interpolation")) ? - interpolate(originalCmd, contextInterpreter.getResourcePool()) : originalCmd; + public BaseZeppelinContext getZeppelinContext() { + return null; + } + + @Override + public InterpreterResult internalInterpret(String cmd, + InterpreterContext contextInterpreter) { LOGGER.debug("Run shell command '" + cmd + "'"); OutputStream outStream = new ByteArrayOutputStream(); - + CommandLine cmdLine = CommandLine.parse(shell); // the Windows CMD shell doesn't handle multiline statements, // they need to be delimited by '&&' instead @@ -113,7 +122,7 @@ public class ShellInterpreter extends KerberosInterpreter { } int exitVal = executor.execute(cmdLine); - LOGGER.info("Paragraph " + contextInterpreter.getParagraphId() + LOGGER.info("Paragraph " + contextInterpreter.getParagraphId() + " return with exit value: " + exitVal); return new InterpreterResult(Code.SUCCESS, outStream.toString()); } catch (ExecuteException e) { diff --git a/shell/src/test/java/org/apache/zeppelin/shell/ShellInterpreterTest.java b/shell/src/test/java/org/apache/zeppelin/shell/ShellInterpreterTest.java index 5a8f4b4..78efa1d 100644 --- a/shell/src/test/java/org/apache/zeppelin/shell/ShellInterpreterTest.java +++ b/shell/src/test/java/org/apache/zeppelin/shell/ShellInterpreterTest.java @@ -20,6 +20,7 @@ package org.apache.zeppelin.shell; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import org.apache.zeppelin.interpreter.InterpreterException; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -51,7 +52,7 @@ public class ShellInterpreterTest { } @Test - public void test() { + public void test() throws InterpreterException { if (System.getProperty("os.name").startsWith("Windows")) { result = shell.interpret("dir", context); } else { @@ -65,7 +66,7 @@ public class ShellInterpreterTest { } @Test - public void testInvalidCommand(){ + public void testInvalidCommand() throws InterpreterException { if (System.getProperty("os.name").startsWith("Windows")) { result = shell.interpret("invalid_command\ndir", context); } else { @@ -76,7 +77,7 @@ public class ShellInterpreterTest { } @Test - public void testShellTimeout() { + public void testShellTimeout() throws InterpreterException { if (System.getProperty("os.name").startsWith("Windows")) { result = shell.interpret("timeout 4", context); } else { diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkInterpreter.java index 239a7fe..91fa7de 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkInterpreter.java @@ -20,6 +20,7 @@ package org.apache.zeppelin.spark; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; +import org.apache.zeppelin.interpreter.AbstractInterpreter; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -29,7 +30,7 @@ import java.util.Properties; * Abstract class for SparkInterpreter. For the purpose of co-exist of NewSparkInterpreter * and OldSparkInterpreter */ -public abstract class AbstractSparkInterpreter extends Interpreter { +public abstract class AbstractSparkInterpreter extends AbstractInterpreter { private SparkInterpreter parentSparkInterpreter; @@ -49,8 +50,6 @@ public abstract class AbstractSparkInterpreter extends Interpreter { public abstract JavaSparkContext getJavaSparkContext(); - public abstract SparkZeppelinContext getZeppelinContext(); - public abstract String getSparkUIUrl(); public abstract boolean isUnsupportedSparkVersion(); diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java index 23e6dad..4a39cfe 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java @@ -24,6 +24,7 @@ import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; +import org.apache.zeppelin.interpreter.BaseZeppelinContext; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterHookRegistry; @@ -143,11 +144,7 @@ public class NewSparkInterpreter extends AbstractSparkInterpreter { } @Override - public InterpreterResult interpret(String st, InterpreterContext context) { - InterpreterContext.set(context); - z.setGui(context.getGui()); - z.setNoteGui(context.getNoteGui()); - z.setInterpreterContext(context); + public InterpreterResult internalInterpret(String st, InterpreterContext context) { sc.setJobGroup(Utils.buildJobGroupId(context), Utils.buildJobDesc(context), false); // set spark.scheduler.pool to null to clear the pool assosiated with this paragraph // sc.setLocalProperty("spark.scheduler.pool", null) will clean the pool diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java index 6f157a0..8eb3959 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java @@ -998,12 +998,11 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { * Interpret a single line. */ @Override - public InterpreterResult interpret(String line, InterpreterContext context) { + public InterpreterResult internalInterpret(String line, InterpreterContext context) { if (isUnsupportedSparkVersion()) { return new InterpreterResult(Code.ERROR, "Spark " + sparkVersion.toString() + " is not supported"); } - z.setInterpreterContext(context); if (line == null || line.trim().length() == 0) { return new InterpreterResult(Code.SUCCESS); } diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 32e805b..486eca0 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -186,7 +186,7 @@ public class PySparkInterpreter extends PythonInterpreter { return "python"; } - public SparkZeppelinContext getZeppelinContext() { + public BaseZeppelinContext getZeppelinContext() { if (sparkInterpreter != null) { return sparkInterpreter.getZeppelinContext(); } else { diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 4a9a9de..43b9e76 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -20,7 +20,7 @@ package org.apache.zeppelin.spark; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; -import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.BaseZeppelinContext; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterResult; @@ -72,7 +72,7 @@ public class SparkInterpreter extends AbstractSparkInterpreter { } @Override - public InterpreterResult interpret(String st, InterpreterContext context) + public InterpreterResult internalInterpret(String st, InterpreterContext context) throws InterpreterException { return delegation.interpret(st, context); } @@ -136,7 +136,7 @@ public class SparkInterpreter extends AbstractSparkInterpreter { } @Override - public SparkZeppelinContext getZeppelinContext() { + public BaseZeppelinContext getZeppelinContext() { return delegation.getZeppelinContext(); } diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java index 8f55a87..7265ae4 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java @@ -101,7 +101,7 @@ public class SparkRInterpreter extends Interpreter { ZeppelinRContext.setSparkSession(sparkInterpreter.getSparkSession()); } ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext()); - ZeppelinRContext.setZeppelinContext(sparkInterpreter.getZeppelinContext()); + ZeppelinRContext.setZeppelinContext((SparkZeppelinContext) sparkInterpreter.getZeppelinContext()); zeppelinR = new ZeppelinR(rCmdPath, sparkRLibPath, SparkRBackend.port(), sparkVersion, timeout, this); try { diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java index e717b2a..7843435 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java @@ -19,6 +19,8 @@ package org.apache.zeppelin.spark; import org.apache.spark.SparkContext; import org.apache.spark.sql.SQLContext; +import org.apache.zeppelin.interpreter.AbstractInterpreter; +import org.apache.zeppelin.interpreter.BaseZeppelinContext; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; @@ -37,7 +39,7 @@ import java.util.Properties; /** * Spark SQL interpreter for Zeppelin. */ -public class SparkSqlInterpreter extends Interpreter { +public class SparkSqlInterpreter extends AbstractInterpreter { private Logger logger = LoggerFactory.getLogger(SparkSqlInterpreter.class); private SparkInterpreter sparkInterpreter; @@ -59,7 +61,17 @@ public class SparkSqlInterpreter extends Interpreter { public void close() {} @Override - public InterpreterResult interpret(String st, InterpreterContext context) + protected boolean isInterpolate() { + return Boolean.parseBoolean(getProperty("zeppelin.spark.sql.interpolation", "false")); + } + + @Override + public BaseZeppelinContext getZeppelinContext() { + return null; + } + + @Override + public InterpreterResult internalInterpret(String st, InterpreterContext context) throws InterpreterException { if (sparkInterpreter.isUnsupportedSparkVersion()) { return new InterpreterResult(Code.ERROR, "Spark " @@ -73,11 +85,9 @@ public class SparkSqlInterpreter extends Interpreter { sc.setJobGroup(Utils.buildJobGroupId(context), Utils.buildJobDesc(context), false); try { - String effectiveSQL = Boolean.parseBoolean(getProperty("zeppelin.spark.sql.interpolation")) ? - interpolate(st, context.getResourcePool()) : st; Method method = sqlc.getClass().getMethod("sql", String.class); String msg = sparkInterpreter.getZeppelinContext().showData( - method.invoke(sqlc, effectiveSQL)); + method.invoke(sqlc, st)); sc.clearJobGroup(); return new InterpreterResult(Code.SUCCESS, msg); } catch (Exception e) { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/AbstractInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/AbstractInterpreter.java new file mode 100644 index 0000000..72df2c8 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/AbstractInterpreter.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +import java.util.Properties; + +public abstract class AbstractInterpreter extends Interpreter { + + public AbstractInterpreter(Properties properties) { + super(properties); + } + + @Override + public InterpreterResult interpret(String st, + InterpreterContext context) throws InterpreterException { + InterpreterContext.set(context); + BaseZeppelinContext z = getZeppelinContext(); + if (z != null) { + z.setGui(context.getGui()); + z.setNoteGui(context.getNoteGui()); + z.setInterpreterContext(context); + } + boolean interpolate = isInterpolate() || + Boolean.parseBoolean(context.getLocalProperties().getOrDefault("interpolate", "false")); + if (interpolate) { + st = interpolate(st, context.getResourcePool()); + } + return internalInterpret(st, context); + } + + public abstract BaseZeppelinContext getZeppelinContext(); + + protected boolean isInterpolate() { + return false; + } + + protected abstract InterpreterResult internalInterpret( + String st, + InterpreterContext context) throws InterpreterException; +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java index 6a44f12..3140a4c 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java @@ -71,7 +71,7 @@ public abstract class BaseZeppelinContext { * @param obj * @return */ - protected abstract String showData(Object obj); + public abstract String showData(Object obj); /** * @deprecated use z.textbox instead @@ -225,7 +225,7 @@ public abstract class BaseZeppelinContext { public void setMaxResult(int maxResult) { this.maxResult = maxResult; } - + /** * display special types of objects for interpreter. * Each interpreter can has its own supported classes. diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/KerberosInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/KerberosInterpreter.java index 4da5ef5..9c2353b 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/KerberosInterpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/KerberosInterpreter.java @@ -38,13 +38,13 @@ import org.slf4j.LoggerFactory; * startKerberosLoginThread() needs to be called inside the open() and * shutdownExecutorService() inside close(). * - * + * * Environment variables defined in zeppelin-env.sh * KERBEROS_REFRESH_INTERVAL controls the refresh interval for Kerberos ticket. The default value * is 1d. * KINIT_FAIL_THRESHOLD controls how many times should kinit retry. The default value is 5. */ -public abstract class KerberosInterpreter extends Interpreter { +public abstract class KerberosInterpreter extends AbstractInterpreter { private Integer kinitFailCount = 0; private ScheduledExecutorService scheduledExecutorService; diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/BaseZeppelinContextTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/BaseZeppelinContextTest.java index 985ba4f..d8323bb 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/BaseZeppelinContextTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/BaseZeppelinContextTest.java @@ -133,7 +133,7 @@ public class BaseZeppelinContextTest { } @Override - protected String showData(Object obj) { + public String showData(Object obj) { return null; } }