Repository: zeppelin
Updated Branches:
  refs/heads/branch-0.8 f1c2b5b45 -> 226e5b8b6


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/226e5b8b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkIntegrationTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkIntegrationTest.java
new file mode 100644
index 0000000..50930a7
--- /dev/null
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkIntegrationTest.java
@@ -0,0 +1,190 @@
+package org.apache.zeppelin.interpreter;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(value = Parameterized.class)
+public class SparkIntegrationTest {
+  private static Logger LOGGER = 
LoggerFactory.getLogger(SparkIntegrationTest.class);
+
+  private static MiniHadoopCluster hadoopCluster;
+  private static MiniZeppelin zeppelin;
+  private static InterpreterFactory interpreterFactory;
+  private static InterpreterSettingManager interpreterSettingManager;
+
+  private String sparkVersion;
+  private String sparkHome;
+
+  public SparkIntegrationTest(String sparkVersion) {
+    LOGGER.info("Testing SparkVersion: " + sparkVersion);
+    this.sparkVersion = sparkVersion;
+    this.sparkHome = SparkDownloadUtils.downloadSpark(sparkVersion);
+  }
+
+  @Parameterized.Parameters
+  public static List<Object[]> data() {
+    return Arrays.asList(new Object[][]{
+        {"2.2.1"},
+        {"2.1.2"},
+        {"2.0.2"},
+        {"1.6.3"}
+    });
+  }
+
+  @BeforeClass
+  public static void setUp() throws IOException {
+    hadoopCluster = new MiniHadoopCluster();
+    hadoopCluster.start();
+
+    zeppelin = new MiniZeppelin();
+    zeppelin.start();
+    interpreterFactory = zeppelin.getInterpreterFactory();
+    interpreterSettingManager = zeppelin.getInterpreterSettingManager();
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    if (zeppelin != null) {
+      zeppelin.stop();
+    }
+    if (hadoopCluster != null) {
+      hadoopCluster.stop();
+    }
+  }
+
+  private void testInterpreterBasics() throws IOException, 
InterpreterException {
+    // test SparkInterpreter
+    interpreterSettingManager.setInterpreterBinding("user1", "note1", 
interpreterSettingManager.getInterpreterSettingIds());
+    Interpreter sparkInterpreter = interpreterFactory.getInterpreter("user1", 
"note1", "spark.spark");
+
+    InterpreterContext context = new 
InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").getContext();
+    InterpreterResult interpreterResult = 
sparkInterpreter.interpret("sc.version", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
+    String detectedSparkVersion = interpreterResult.message().get(0).getData();
+    assertTrue(detectedSparkVersion +" doesn't contain " + this.sparkVersion, 
detectedSparkVersion.contains(this.sparkVersion));
+    interpreterResult = sparkInterpreter.interpret("sc.range(1,10).sum()", 
context);
+    assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
+    assertTrue(interpreterResult.msg.get(0).getData().contains("45"));
+
+    // test PySparkInterpreter
+    Interpreter pySparkInterpreter = 
interpreterFactory.getInterpreter("user1", "note1", "spark.pyspark");
+    interpreterResult = 
pySparkInterpreter.interpret("sqlContext.createDataFrame([(1,'a'),(2,'b')], 
['id','name']).registerTempTable('test')", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
+
+    // test IPySparkInterpreter
+    Interpreter ipySparkInterpreter = 
interpreterFactory.getInterpreter("user1", "note1", "spark.ipyspark");
+    interpreterResult = 
ipySparkInterpreter.interpret("sqlContext.table('test').show()", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
+
+    // test SparkSQLInterpreter
+    Interpreter sqlInterpreter = interpreterFactory.getInterpreter("user1", 
"note1", "spark.sql");
+    interpreterResult = sqlInterpreter.interpret("select count(1) as c from 
test", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
+    assertEquals(InterpreterResult.Type.TABLE, 
interpreterResult.message().get(0).getType());
+    assertEquals("c\n2\n", interpreterResult.message().get(0).getData());
+
+    // test SparkRInterpreter
+    Interpreter sparkrInterpreter = interpreterFactory.getInterpreter("user1", 
"note1", "spark.r");
+    if (isSpark2()) {
+      interpreterResult = sparkrInterpreter.interpret("df <- 
as.DataFrame(faithful)\nhead(df)", context);
+    } else {
+      interpreterResult = sparkrInterpreter.interpret("df <- 
createDataFrame(sqlContext, faithful)\nhead(df)", context);
+    }
+    assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
+    assertEquals(InterpreterResult.Type.TEXT, 
interpreterResult.message().get(0).getType());
+    
assertTrue(interpreterResult.message().get(0).getData().contains("eruptions 
waiting"));
+  }
+
+  @Test
+  public void testLocalMode() throws IOException, YarnException, 
InterpreterException {
+    InterpreterSetting sparkInterpreterSetting = 
interpreterSettingManager.getInterpreterSettingByName("spark");
+    sparkInterpreterSetting.setProperty("master", "local[*]");
+    sparkInterpreterSetting.setProperty("SPARK_HOME", sparkHome);
+    sparkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", 
zeppelin.getZeppelinConfDir().getAbsolutePath());
+    sparkInterpreterSetting.setProperty("zeppelin.spark.useHiveContext", 
"false");
+    sparkInterpreterSetting.setProperty("zeppelin.pyspark.useIPython", 
"false");
+
+    testInterpreterBasics();
+
+    // no yarn application launched
+    GetApplicationsRequest request = 
GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
+    GetApplicationsResponse response = 
hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
+    assertEquals(0, response.getApplicationList().size());
+
+    interpreterSettingManager.close();
+  }
+
+  @Test
+  public void testYarnClientMode() throws IOException, YarnException, 
InterruptedException, InterpreterException {
+    InterpreterSetting sparkInterpreterSetting = 
interpreterSettingManager.getInterpreterSettingByName("spark");
+    sparkInterpreterSetting.setProperty("master", "yarn-client");
+    sparkInterpreterSetting.setProperty("HADOOP_CONF_DIR", 
hadoopCluster.getConfigPath());
+    sparkInterpreterSetting.setProperty("SPARK_HOME", sparkHome);
+    sparkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", 
zeppelin.getZeppelinConfDir().getAbsolutePath());
+    sparkInterpreterSetting.setProperty("zeppelin.spark.useHiveContext", 
"false");
+    sparkInterpreterSetting.setProperty("zeppelin.pyspark.useIPython", 
"false");
+    sparkInterpreterSetting.setProperty("PYSPARK_PYTHON", getPythonExec());
+    sparkInterpreterSetting.setProperty("spark.driver.memory", "512m");
+
+    testInterpreterBasics();
+
+    // 1 yarn application launched
+    GetApplicationsRequest request = 
GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
+    GetApplicationsResponse response = 
hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
+    assertEquals(1, response.getApplicationList().size());
+
+    interpreterSettingManager.close();
+  }
+
+  @Test
+  public void testYarnClusterMode() throws IOException, YarnException, 
InterruptedException, InterpreterException {
+    InterpreterSetting sparkInterpreterSetting = 
interpreterSettingManager.getInterpreterSettingByName("spark");
+    sparkInterpreterSetting.setProperty("master", "yarn-cluster");
+    sparkInterpreterSetting.setProperty("HADOOP_CONF_DIR", 
hadoopCluster.getConfigPath());
+    sparkInterpreterSetting.setProperty("SPARK_HOME", sparkHome);
+    sparkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", 
zeppelin.getZeppelinConfDir().getAbsolutePath());
+    sparkInterpreterSetting.setProperty("zeppelin.spark.useHiveContext", 
"false");
+    sparkInterpreterSetting.setProperty("zeppelin.pyspark.useIPython", 
"false");
+    sparkInterpreterSetting.setProperty("spark.pyspark.python", 
getPythonExec());
+    sparkInterpreterSetting.setProperty("spark.driver.memory", "512m");
+
+    testInterpreterBasics();
+
+    // 1 yarn application launched
+    GetApplicationsRequest request = 
GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
+    GetApplicationsResponse response = 
hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
+    assertEquals(1, response.getApplicationList().size());
+
+    interpreterSettingManager.close();
+  }
+
+  private boolean isSpark2() {
+    return this.sparkVersion.startsWith("2.");
+  }
+
+  private String getPythonExec() throws IOException, InterruptedException {
+    Process process = Runtime.getRuntime().exec(new String[]{"which", 
"python"});
+    if (process.waitFor() != 0) {
+      throw new RuntimeException("Fail to run command: which python.");
+    }
+    return IOUtils.toString(process.getInputStream()).trim();
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/226e5b8b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkInterpreterModeTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkInterpreterModeTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkInterpreterModeTest.java
deleted file mode 100644
index 22bb17e..0000000
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkInterpreterModeTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-package org.apache.zeppelin.interpreter;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.EnumSet;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class SparkInterpreterModeTest {
-
-  private static MiniHadoopCluster hadoopCluster;
-  private static MiniZeppelin zeppelin;
-  private static InterpreterFactory interpreterFactory;
-  private static InterpreterSettingManager interpreterSettingManager;
-
-  @BeforeClass
-  public static void setUp() throws IOException {
-    hadoopCluster = new MiniHadoopCluster();
-    hadoopCluster.start();
-
-    zeppelin = new MiniZeppelin();
-    zeppelin.start();
-    interpreterFactory = zeppelin.getInterpreterFactory();
-    interpreterSettingManager = zeppelin.getInterpreterSettingManager();
-  }
-
-  @AfterClass
-  public static void tearDown() throws IOException {
-    if (zeppelin != null) {
-      zeppelin.stop();
-    }
-    if (hadoopCluster != null) {
-      hadoopCluster.stop();
-    }
-  }
-
-  private void testInterpreterBasics() throws IOException, 
InterpreterException {
-    // test SparkInterpreter
-    interpreterSettingManager.setInterpreterBinding("user1", "note1", 
interpreterSettingManager.getInterpreterSettingIds());
-    Interpreter sparkInterpreter = interpreterFactory.getInterpreter("user1", 
"note1", "spark.spark");
-
-    InterpreterContext context = new 
InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").getContext();
-    InterpreterResult interpreterResult = 
sparkInterpreter.interpret("sc.version", context);
-    assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
-    String sparkVersion = interpreterResult.message().get(0).getData();
-    interpreterResult = sparkInterpreter.interpret("sc.range(1,10).sum()", 
context);
-    assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
-    assertTrue(interpreterResult.msg.get(0).getData().contains("45"));
-
-    // test PySparkInterpreter
-    Interpreter pySparkInterpreter = 
interpreterFactory.getInterpreter("user1", "note1", "spark.pyspark");
-    interpreterResult = 
pySparkInterpreter.interpret("sqlContext.createDataFrame([(1,'a'),(2,'b')], 
['id','name']).registerTempTable('test')", context);
-    assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
-
-    // test IPySparkInterpreter
-    Interpreter ipySparkInterpreter = 
interpreterFactory.getInterpreter("user1", "note1", "spark.ipyspark");
-    interpreterResult = 
ipySparkInterpreter.interpret("sqlContext.table('test').show()", context);
-    assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
-
-    // test SparkSQLInterpreter
-    Interpreter sqlInterpreter = interpreterFactory.getInterpreter("user1", 
"note1", "spark.sql");
-    interpreterResult = sqlInterpreter.interpret("select count(1) from test", 
context);
-    assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
-    assertEquals(InterpreterResult.Type.TABLE, 
interpreterResult.message().get(0).getType());
-    assertEquals("count(1)\n2\n", 
interpreterResult.message().get(0).getData());
-
-    // test SparkRInterpreter
-    Interpreter sparkrInterpreter = interpreterFactory.getInterpreter("user1", 
"note1", "spark.r");
-    if (isSpark2(sparkVersion)) {
-      interpreterResult = sparkrInterpreter.interpret("df <- 
as.DataFrame(faithful)\nhead(df)", context);
-    } else {
-      interpreterResult = sparkrInterpreter.interpret("df <- 
createDataFrame(sqlContext, faithful)\nhead(df)", context);
-    }
-    assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
-    assertEquals(InterpreterResult.Type.TEXT, 
interpreterResult.message().get(0).getType());
-    
assertTrue(interpreterResult.message().get(0).getData().contains("eruptions 
waiting"));
-  }
-
-  @Test
-  public void testLocalMode() throws IOException, YarnException, 
InterpreterException {
-    InterpreterSetting sparkInterpreterSetting = 
interpreterSettingManager.getInterpreterSettingByName("spark");
-    sparkInterpreterSetting.setProperty("master", "local[*]");
-    sparkInterpreterSetting.setProperty("SPARK_HOME", 
System.getenv("SPARK_HOME"));
-    sparkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", 
zeppelin.getZeppelinConfDir().getAbsolutePath());
-    sparkInterpreterSetting.setProperty("zeppelin.spark.useHiveContext", 
"false");
-    sparkInterpreterSetting.setProperty("zeppelin.pyspark.useIPython", 
"false");
-
-    testInterpreterBasics();
-
-    // no yarn application launched
-    GetApplicationsRequest request = 
GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
-    GetApplicationsResponse response = 
hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
-    assertEquals(0, response.getApplicationList().size());
-
-    interpreterSettingManager.close();
-  }
-
-  @Test
-  public void testYarnClientMode() throws IOException, YarnException, 
InterruptedException, InterpreterException {
-    InterpreterSetting sparkInterpreterSetting = 
interpreterSettingManager.getInterpreterSettingByName("spark");
-    sparkInterpreterSetting.setProperty("master", "yarn-client");
-    sparkInterpreterSetting.setProperty("HADOOP_CONF_DIR", 
hadoopCluster.getConfigPath());
-    sparkInterpreterSetting.setProperty("SPARK_HOME", 
System.getenv("SPARK_HOME"));
-    sparkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", 
zeppelin.getZeppelinConfDir().getAbsolutePath());
-    sparkInterpreterSetting.setProperty("zeppelin.spark.useHiveContext", 
"false");
-    sparkInterpreterSetting.setProperty("zeppelin.pyspark.useIPython", 
"false");
-    sparkInterpreterSetting.setProperty("PYSPARK_PYTHON", getPythonExec());
-    sparkInterpreterSetting.setProperty("spark.driver.memory", "512m");
-
-    testInterpreterBasics();
-
-    // 1 yarn application launched
-    GetApplicationsRequest request = 
GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
-    GetApplicationsResponse response = 
hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
-    assertEquals(1, response.getApplicationList().size());
-
-    interpreterSettingManager.close();
-  }
-
-  @Test
-  public void testYarnClusterMode() throws IOException, YarnException, 
InterruptedException, InterpreterException {
-    InterpreterSetting sparkInterpreterSetting = 
interpreterSettingManager.getInterpreterSettingByName("spark");
-    sparkInterpreterSetting.setProperty("master", "yarn-cluster");
-    sparkInterpreterSetting.setProperty("HADOOP_CONF_DIR", 
hadoopCluster.getConfigPath());
-    sparkInterpreterSetting.setProperty("SPARK_HOME", 
System.getenv("SPARK_HOME"));
-    sparkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", 
zeppelin.getZeppelinConfDir().getAbsolutePath());
-    sparkInterpreterSetting.setProperty("zeppelin.spark.useHiveContext", 
"false");
-    sparkInterpreterSetting.setProperty("zeppelin.pyspark.useIPython", 
"false");
-    sparkInterpreterSetting.setProperty("spark.pyspark.python", 
getPythonExec());
-    sparkInterpreterSetting.setProperty("spark.driver.memory", "512m");
-
-    testInterpreterBasics();
-
-    // 1 yarn application launched
-    GetApplicationsRequest request = 
GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
-    GetApplicationsResponse response = 
hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
-    assertEquals(1, response.getApplicationList().size());
-
-    interpreterSettingManager.close();
-  }
-
-  private boolean isSpark2(String sparkVersion) {
-    return sparkVersion.startsWith("2.");
-  }
-
-  private String getPythonExec() throws IOException, InterruptedException {
-    Process process = Runtime.getRuntime().exec(new String[]{"which", 
"python"});
-    if (process.waitFor() != 0) {
-      throw new RuntimeException("Fail to run command: which python.");
-    }
-    return IOUtils.toString(process.getInputStream()).trim();
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/226e5b8b/zeppelin-zengine/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/resources/log4j.properties 
b/zeppelin-zengine/src/test/resources/log4j.properties
index 74f619b..ecfd05e 100644
--- a/zeppelin-zengine/src/test/resources/log4j.properties
+++ b/zeppelin-zengine/src/test/resources/log4j.properties
@@ -28,8 +28,6 @@ log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} 
%5p %c:%L - %m%n
 # Root logger option
 log4j.rootLogger=INFO, stdout
 
-log4j.logger.org.apache.zeppelin.notebook.repo=DEBUG
- 
 #mute some noisy guys
 log4j.logger.org.apache.hadoop.mapred=WARN
 log4j.logger.org.apache.hadoop.hive.ql=WARN
@@ -44,6 +42,6 @@ log4j.logger.DataNucleus.Datastore=ERROR
 # Log all JDBC parameters
 log4j.logger.org.hibernate.type=ALL
 
-log4j.logger.org.apache.zeppelin.interpreter=DEBUG
-log4j.logger.org.apache.zeppelin.scheduler=DEBUG
+log4j.logger.org.apache.hadoop=WARN
 
+log4j.logger.org.apache.zeppelin.interpreter=DEBUG

Reply via email to