Repository: incubator-zeppelin Updated Branches: refs/heads/master 266ffc9a7 -> 1bc5e8df9
ZEPPELIN-61 Lens Interpreter Add Lens interpreter. #100 has implementation from praagarw. This pr grabbing a necessary commits from #100 and rebase from current master branch. Author: Pranav Agarwal <[email protected]> Author: Lee moon soo <[email protected]> Closes #114 from Leemoonsoo/ZEPPELIN-61 and squashes the following commits: 0edaf8c [Lee moon soo] Bumpup lens interpreter artifact version adc23f9 [Pranav Agarwal] fixed merge conflicts for conf/zeppelin-site.xml.template 97eb569 [Pranav Agarwal] added lens entry to conf/zeppelin-site.xml.template b81ab78 [Pranav Agarwal] Lens Interpreter Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/1bc5e8df Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/1bc5e8df Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/1bc5e8df Branch: refs/heads/master Commit: 1bc5e8df922aff6332c9b5365f71944ca063fd01 Parents: 266ffc9 Author: Pranav Agarwal <[email protected]> Authored: Sat Jun 27 11:15:56 2015 -0700 Committer: Lee moon soo <[email protected]> Committed: Sat Jun 27 12:12:55 2015 -0700 ---------------------------------------------------------------------- conf/zeppelin-site.xml.template | 2 +- lens/pom.xml | 303 +++++++++++++ .../apache/zeppelin/lens/ExecutionDetail.java | 44 ++ .../org/apache/zeppelin/lens/LensBootstrap.java | 41 ++ .../apache/zeppelin/lens/LensInterpreter.java | 451 +++++++++++++++++++ .../zeppelin/lens/LensJLineShellComponent.java | 244 ++++++++++ .../lens/LensSimpleExecutionStrategy.java | 86 ++++ .../zeppelin/lens/LensInterpreterTest.java | 71 +++ pom.xml | 1 + .../zeppelin/conf/ZeppelinConfiguration.java | 3 +- 10 files changed, 1244 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/1bc5e8df/conf/zeppelin-site.xml.template ---------------------------------------------------------------------- diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index 8f8bf06..7bd5c3a 100644 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -66,7 +66,7 @@ <property> <name>zeppelin.interpreters</name> - <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter</value> + <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter</value> <description>Comma separated interpreter configurations. First interpreter become a default</description> </property> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/1bc5e8df/lens/pom.xml ---------------------------------------------------------------------- diff --git a/lens/pom.xml b/lens/pom.xml new file mode 100644 index 0000000..d665346 --- /dev/null +++ b/lens/pom.xml @@ -0,0 +1,303 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>zeppelin</artifactId> + <groupId>org.apache.zeppelin</groupId> + <version>0.6.0-incubating-SNAPSHOT</version> + </parent> + + <groupId>org.apache.zeppelin</groupId> + <artifactId>zeppelin-lens</artifactId> + <packaging>jar</packaging> + <version>0.6.0-incubating-SNAPSHOT</version> + <name>Zeppelin: Lens interpreter</name> + <url>http://www.apache.org</url> + + <properties> + <lens.version>2.2.0-beta-incubating-SNAPSHOT</lens.version> + <spring-shell.version>1.1.0.RELEASE</spring-shell.version> + <hadoop-common.version>2.4.0</hadoop-common.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.zeppelin</groupId> + <artifactId>zeppelin-interpreter</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </dependency> + + <dependency> + <groupId>org.checkerframework</groupId> + <artifactId>jdk7</artifactId> + <version>1.9.1</version> + </dependency> + + <dependency> + <groupId>org.apache.lens</groupId> + <artifactId>lens-cli</artifactId> + <version>${lens.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.lens</groupId> + <artifactId>lens-client</artifactId> + <version>${lens.version}</version> + </dependency> + + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-core-asl</artifactId> + <version>1.9.13</version> + </dependency> + + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + <version>1.9.13</version> + </dependency> + + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-xc</artifactId> + <version>1.9.11</version> + </dependency> + + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-jaxrs</artifactId> + <version>1.9.11</version> + </dependency> + + <dependency> + <groupId>org.glassfish.jersey.core</groupId> + <artifactId>jersey-server</artifactId> + <version>2.3.1</version> + </dependency> + + <dependency> + <groupId>org.glassfish.jersey.core</groupId> + <artifactId>jersey-client</artifactId> + <version>2.3.1</version> + </dependency> + + <dependency> + <groupId>org.springframework.shell</groupId> + <artifactId>spring-shell</artifactId> + <version>${spring-shell.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop-common.version}</version> + <exclusions> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-core</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-json</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-server</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-deploy-plugin</artifactId> + <version>2.7</version> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + + <plugin> + <artifactId>maven-enforcer-plugin</artifactId> + <version>1.3.1</version> + <executions> + <execution> + <id>enforce</id> + <phase>none</phase> + </execution> + </executions> + </plugin> + + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + <version>2.8</version> + <executions> + <execution> + <id>copy-dependencies</id> + <phase>package</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/../../interpreter/lens</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>false</overWriteSnapshots> + <overWriteIfNewer>true</overWriteIfNewer> + <includeScope>runtime</includeScope> + </configuration> + </execution> + <execution> + <id>copy-artifact</id> + <phase>package</phase> + <goals> + <goal>copy</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/../../interpreter/lens</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>false</overWriteSnapshots> + <overWriteIfNewer>true</overWriteIfNewer> + <includeScope>runtime</includeScope> + <artifactItems> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>${project.artifactId}</artifactId> + <version>${project.version}</version> + <type>${project.packaging}</type> + </artifactItem> + </artifactItems> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-clean-plugin</artifactId> + <version>2.4.1</version> + <configuration> + <filesets> + <fileset> + <directory>${basedir}/../interpreter/lens</directory> + <followSymlinks>false</followSymlinks> + </fileset> + </filesets> + </configuration> + </plugin> + + </plugins> + </build> + <repositories> + <repository> + <id>inmobi.repo</id> + <url>https://github.com/InMobi/mvn-repo/raw/master/releases</url> + <snapshots> + <enabled>false</enabled> + </snapshots> + </repository> + <repository> + <id>inmobi.snapshots</id> + <url>https://github.com/InMobi/mvn-repo/raw/master/snapshots</url> + <releases> + <enabled>false</enabled> + </releases> + <snapshots> + <enabled>true</enabled> + </snapshots> + </repository> + <repository> + <id>central</id> + <url>http://repo1.maven.org/maven2</url> + <snapshots> + <enabled>false</enabled> + </snapshots> + </repository> + <repository> + <id>cloudera</id> + <url>https://repository.cloudera.com/artifactory/cloudera-repos</url> + <releases> + <enabled>true</enabled> + <updatePolicy>never</updatePolicy> + </releases> + <snapshots> + <enabled>false</enabled> + <updatePolicy>never</updatePolicy> + </snapshots> + </repository> + <repository> + <id>Codehaus repository</id> + <url>http://repository.codehaus.org/</url> + <snapshots> + <enabled>false</enabled> + </snapshots> + </repository> + <repository> + <id>apache.snapshots.repo</id> + <url>https://repository.apache.org/content/groups/snapshots</url> + <name>Apache Snapshots Repository</name> + <releases> + <enabled>false</enabled> + </releases> + <snapshots> + <enabled>true</enabled> + </snapshots> + </repository> + <repository> + <id>default</id> + <url>https://repository.apache.org/content/groups/public/</url> + </repository> + <repository> + <id>projectlombok.org</id> + <url>http://projectlombok.org/mavenrepo</url> + <snapshots> + <enabled>false</enabled> + </snapshots> + </repository> + <!-- see https://jira.springsource.org/browse/SHL-52 --> + <repository> + <id>ext-release-local</id> + <url>http://repo.springsource.org/simple/ext-release-local/</url> + <snapshots> + <enabled>false</enabled> + </snapshots> + </repository> + </repositories> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/1bc5e8df/lens/src/main/java/org/apache/zeppelin/lens/ExecutionDetail.java ---------------------------------------------------------------------- diff --git a/lens/src/main/java/org/apache/zeppelin/lens/ExecutionDetail.java b/lens/src/main/java/org/apache/zeppelin/lens/ExecutionDetail.java new file mode 100644 index 0000000..3f8b9ab --- /dev/null +++ b/lens/src/main/java/org/apache/zeppelin/lens/ExecutionDetail.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.lens; + +import org.apache.lens.client.LensClient; +import org.springframework.shell.core.JLineShell; +/** + * Pojo tracking query execution details + * Used to cancel the query + */ +public class ExecutionDetail { + private String queryHandle; + private LensClient lensClient; + private JLineShell shell; + ExecutionDetail(String qh, LensClient lensClient, JLineShell shell) { + this.queryHandle = qh; + this.lensClient = lensClient; + this.shell = shell; + } + public JLineShell getShell() { + return shell; + } + public String getQueryHandle() { + return queryHandle; + } + public LensClient getLensClient() { + return lensClient; + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/1bc5e8df/lens/src/main/java/org/apache/zeppelin/lens/LensBootstrap.java ---------------------------------------------------------------------- diff --git a/lens/src/main/java/org/apache/zeppelin/lens/LensBootstrap.java b/lens/src/main/java/org/apache/zeppelin/lens/LensBootstrap.java new file mode 100644 index 0000000..2ce9c57 --- /dev/null +++ b/lens/src/main/java/org/apache/zeppelin/lens/LensBootstrap.java @@ -0,0 +1,41 @@ +/* + * Copyright 2011-2012 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.lens; + +import java.util.Properties; + +import org.springframework.beans.factory.support.DefaultListableBeanFactory; +import org.springframework.beans.factory.support.RootBeanDefinition; +import org.springframework.context.support.GenericApplicationContext; +import org.springframework.shell.core.Shell; + +/** + * workaround for https://github.com/spring-projects/spring-shell/issues/73 + */ +public class LensBootstrap extends org.springframework.shell.Bootstrap { + public LensBootstrap() { + super(); + } + public LensJLineShellComponent getLensJLineShellComponent() { + GenericApplicationContext ctx = (GenericApplicationContext) getApplicationContext(); + RootBeanDefinition rbd = new RootBeanDefinition(); + rbd.setBeanClass(LensJLineShellComponent.class); + DefaultListableBeanFactory bf = (DefaultListableBeanFactory) ctx.getBeanFactory(); + bf.registerBeanDefinition(LensJLineShellComponent.class.getSimpleName(), rbd); + return ctx.getBean(LensJLineShellComponent.class); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/1bc5e8df/lens/src/main/java/org/apache/zeppelin/lens/LensInterpreter.java ---------------------------------------------------------------------- diff --git a/lens/src/main/java/org/apache/zeppelin/lens/LensInterpreter.java b/lens/src/main/java/org/apache/zeppelin/lens/LensInterpreter.java new file mode 100644 index 0000000..2632775 --- /dev/null +++ b/lens/src/main/java/org/apache/zeppelin/lens/LensInterpreter.java @@ -0,0 +1,451 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.lens; + +import java.util.List; +import java.util.Properties; +import java.util.regex.Pattern; +import java.util.regex.Matcher; +import java.util.Map; +import java.util.LinkedHashMap; +import java.util.concurrent.ConcurrentHashMap; +import java.io.ByteArrayOutputStream; + +import org.apache.lens.client.LensClient; +import org.apache.lens.client.LensClientConfig; +import org.apache.lens.client.LensClientSingletonWrapper; +import org.apache.lens.cli.commands.BaseLensCommand; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.shell.Bootstrap; +import org.springframework.shell.core.CommandResult; +import org.springframework.shell.core.JLineShell; +import org.springframework.shell.core.JLineShellComponent; +import org.springframework.shell.support.logging.HandlerUtils; + + +/** + * Lens interpreter for Zeppelin. + */ +public class LensInterpreter extends Interpreter { + + static final Logger s_logger = LoggerFactory.getLogger(LensInterpreter.class); + static final String LENS_CLIENT_DBNAME = "lens.client.dbname"; + static final String LENS_SERVER_URL = "lens.server.base.url"; + static final String LENS_SESSION_CLUSTER_USER = "lens.session.cluster.user"; + static final String LENS_PERSIST_RESULTSET = "lens.query.enable.persistent.resultset"; + static final String ZEPPELIN_LENS_RUN_CONCURRENT_SESSION = "zeppelin.lens.run.concurrent"; + static final String ZEPPELIN_LENS_CONCURRENT_SESSIONS = "zeppelin.lens.maxThreads"; + static final String ZEPPELIN_MAX_ROWS = "zeppelin.lens.maxResults"; + static final Map<String, Pattern> LENS_TABLE_FORMAT_REGEX = new LinkedHashMap<String, Pattern>() { + { + put("cubes", Pattern.compile(".*show\\s+cube.*")); + put("nativetables", Pattern.compile(".*show\\s+nativetable.*")); + put("storages", Pattern.compile(".*show\\s+storage.*")); + put("facts", Pattern.compile(".*show\\s+fact.*")); + put("dimensions", Pattern.compile(".*show\\s+dimension.*")); + put("params", Pattern.compile(".*show\\s+param.*")); + put("databases", Pattern.compile(".*show\\s+database.*")); + put("query results", Pattern.compile(".*query\\s+results.*")); + } + }; + + private static Pattern s_queryExecutePattern = Pattern.compile(".*query\\s+execute\\s+(.*)"); + private static Map<String, ExecutionDetail> s_paraToQH = + new ConcurrentHashMap<String, ExecutionDetail> (); //tracks paragraphID -> Lens QueryHandle + private static Map<LensClient, Boolean> s_clientMap = + new ConcurrentHashMap<LensClient, Boolean>(); + + private int m_maxResults; + private int m_maxThreads; + private JLineShell m_shell; + private LensClientConfig m_lensConf; + private Bootstrap m_bs; + private LensClient m_lensClient; + + + static { + Interpreter.register( + "lens", + "lens", + LensInterpreter.class.getName(), + new InterpreterPropertyBuilder() + .add(ZEPPELIN_LENS_RUN_CONCURRENT_SESSION, "true", "Run concurrent Lens Sessions") + .add(ZEPPELIN_LENS_CONCURRENT_SESSIONS, "10", + "If concurrency is true then how many threads?") + .add(ZEPPELIN_MAX_ROWS, "1000", "max number of rows to display") + .add(LENS_SERVER_URL, "http://<hostname>:<port>/lensapi", "The URL for Lens Server") + .add(LENS_CLIENT_DBNAME, "default", "The database schema name") + .add(LENS_PERSIST_RESULTSET, "false", "Apache Lens to persist result in HDFS?") + .add(LENS_SESSION_CLUSTER_USER, "default", "Hadoop cluster username").build()); + } + + public LensInterpreter(Properties property) { + super(property); + try { + m_lensConf = new LensClientConfig(); + m_lensConf.set(LENS_SERVER_URL, property.get(LENS_SERVER_URL).toString()); + m_lensConf.set(LENS_CLIENT_DBNAME, property.get(LENS_CLIENT_DBNAME).toString()); + m_lensConf.set(LENS_SESSION_CLUSTER_USER, property.get(LENS_SESSION_CLUSTER_USER).toString()); + m_lensConf.set(LENS_PERSIST_RESULTSET, property.get(LENS_PERSIST_RESULTSET).toString()); + try { + m_maxResults = Integer.parseInt(property.get(ZEPPELIN_MAX_ROWS).toString()); + } catch (NumberFormatException|NullPointerException e) { + m_maxResults = 1000; + s_logger.error("unable to parse " + ZEPPELIN_MAX_ROWS + " :" + + property.get(ZEPPELIN_MAX_ROWS), e); + } + try { + m_maxThreads = Integer.parseInt(property.get(ZEPPELIN_LENS_CONCURRENT_SESSIONS).toString()); + } catch (NumberFormatException|NullPointerException e) { + m_maxThreads = 10; + s_logger.error("unable to parse " + ZEPPELIN_LENS_CONCURRENT_SESSIONS + " :" + + property.get(ZEPPELIN_LENS_CONCURRENT_SESSIONS), e); + } + s_logger.info("LensInterpreter created"); + } + catch (Exception e) { + e.printStackTrace(); + s_logger.error("unable to create lens interpreter", e); + } + } + + private Bootstrap createBootstrap() { + return new LensBootstrap(); + } + + private JLineShell getJLineShell(Bootstrap bs) { + if (bs instanceof LensBootstrap) { + return ((LensBootstrap) bs).getLensJLineShellComponent(); + } else { + return bs.getJLineShellComponent(); + } + } + + protected void init() { + try { + m_bs = createBootstrap(); + m_shell = getJLineShell(m_bs); + } catch (Exception ex) { + s_logger.error("could not initialize commandLine", ex); + } + } + + @Override + public void open() { + s_logger.info("LensInterpreter opening"); + m_lensClient = new LensClient(m_lensConf); + LensClientSingletonWrapper.instance().setClient(m_lensClient); + init(); + s_logger.info("LensInterpreter opened"); + } + + @Override + public void close() { + closeConnections(); + s_logger.info("LensInterpreter closed"); + } + + private static void closeConnections() { + for (LensClient cl : s_clientMap.keySet()) { + if (cl.isConnectionOpen()) { + closeLensClient(cl); + } + } + } + + private static void closeLensClient(LensClient lensClient) { + try { + lensClient.closeConnection(); + } catch (Exception e) { + s_logger.error("unable to close lensClient", e); + } + } + + private LensClient createAndSetLensClient(Bootstrap bs) { + LensClient lensClient = null; + try { + lensClient = new LensClient(m_lensConf); + + for (String beanName : bs.getApplicationContext().getBeanDefinitionNames()) { + if (bs.getApplicationContext().getBean(beanName) instanceof BaseLensCommand) { + ((BaseLensCommand) bs.getApplicationContext().getBean(beanName)) + .setClient(lensClient); + } + } + } catch (Exception e) { + s_logger.error("unable to create lens client", e); + throw e; + } + return lensClient; + } + + private InterpreterResult HandleHelp(JLineShell shell, String st) { + java.util.logging.StreamHandler sh = null; + java.util.logging.Logger springLogger = null; + java.util.logging.Formatter formatter = new java.util.logging.Formatter() { + public String format(java.util.logging.LogRecord record) { + return record.getMessage(); + } + }; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + sh = new java.util.logging.StreamHandler(baos, formatter); + springLogger = HandlerUtils.getLogger(org.springframework.shell.core.SimpleParser.class); + springLogger.addHandler(sh); + shell.executeCommand(st); + } catch (Exception e) { + s_logger.error(e.getMessage(), e); + return new InterpreterResult(Code.ERROR, e.getMessage()); + } + finally { + sh.flush(); + springLogger.removeHandler(sh); + sh.close(); + } + return new InterpreterResult(Code.SUCCESS, baos.toString()); + } + + private String modifyQueryStatement(String st) { + Matcher matcher = s_queryExecutePattern.matcher(st.toLowerCase()); + if (!matcher.find()) { + return st; + } + StringBuilder sb = new StringBuilder("query execute "); + if (!st.toLowerCase().matches(".*--async\\s+true")) { + sb.append("--async true "); + } + sb.append(matcher.group(1)); + if (!st.toLowerCase().matches(".*limit\\s+\\d+.*")) { + sb.append(" limit "); + sb.append(m_maxResults); + } + return sb.toString(); + } + + @Override + public InterpreterResult interpret(String input, InterpreterContext context) { + if (input == null || input.length() == 0) { + return new InterpreterResult(Code.ERROR, "no command submitted"); + } + String st = input.replaceAll("\\n", " "); + s_logger.info("LensInterpreter command: " + st); + + Bootstrap bs = createBootstrap(); + JLineShell shell = getJLineShell(bs); + CommandResult res = null; + LensClient lensClient = null; + String qh = null; + + if (st.trim().startsWith("help")) { + return HandleHelp(shell, st); + } + + try { + lensClient = createAndSetLensClient(bs); + s_clientMap.put(lensClient, true); + + String lensCommand = modifyQueryStatement(st); + + s_logger.info("executing command : " + lensCommand); + res = shell.executeCommand(lensCommand); + + if (!lensCommand.equals(st) && res != null + && res.getResult() != null + && res.getResult().toString().trim().matches("[a-z0-9-]+")) { + // setup query progress tracking + qh = res.getResult().toString(); + s_paraToQH.put(context.getParagraphId(), + new ExecutionDetail(qh, lensClient, shell)); + String getResultsCmd = "query results --async false " + qh; + s_logger.info("executing query results command : " + context.getParagraphId() + + " : " + getResultsCmd); + res = shell.executeCommand(getResultsCmd); + s_paraToQH.remove(context.getParagraphId()); + } + } catch (Exception ex) { + s_logger.error("error in interpret", ex); + return new InterpreterResult(Code.ERROR, ex.getMessage()); + } + finally { + if (shell != null) { + closeShell(shell); + } + if (lensClient != null) { + closeLensClient(lensClient); + s_clientMap.remove(lensClient); + } + if (qh != null) { + s_paraToQH.remove(context.getParagraphId()); + } + } + return new InterpreterResult(Code.SUCCESS, formatResult(st, res)); + } + + private void closeShell(JLineShell shell) { + if (shell instanceof LensJLineShellComponent) { + ((LensJLineShellComponent) shell).stop(); + } else { + ((JLineShellComponent) shell).stop(); + } + } + + private String formatResult(String st, CommandResult result) { + if (result == null) { + return "error in interpret, no result object returned"; + } + if (!result.isSuccess() || result.getResult() == null) { + if (result.getException() != null) { + return result.getException().getMessage(); + //try describe cube (without cube name)- error is written as a warning, + //but not returned to result object + } else { + return "error in interpret, unable to execute command"; + } + } + StringBuilder sb = new StringBuilder(); + for (Map.Entry<String, Pattern> entry : LENS_TABLE_FORMAT_REGEX.entrySet()) { + if (entry.getValue().matcher(st.toLowerCase()).find()) { + sb.append("%table " + entry.getKey() + " \n"); + break; + } + } + if (s_queryExecutePattern.matcher(st.toLowerCase()).find() && + result.getResult().toString().contains(" rows process in (")) { + sb.append("%table "); + } + if (sb.length() > 0) { + return sb.append(result.getResult().toString()).toString(); + } + return result.getResult().toString(); + //Lens sends error messages without setting result.isSuccess() = false. + } + + @Override + public void cancel(InterpreterContext context) { + if (!s_paraToQH.containsKey(context.getParagraphId())) { + s_logger.error("ignoring cancel from " + context.getParagraphId()); + return; + } + String qh = s_paraToQH.get(context.getParagraphId()).getQueryHandle(); + s_logger.info("preparing to cancel : (" + context.getParagraphId() + ") :" + qh); + Bootstrap bs = createBootstrap(); + JLineShell shell = getJLineShell(bs); + LensClient lensClient = null; + try { + lensClient = createAndSetLensClient(bs); + s_clientMap.put(lensClient, true); + s_logger.info("invoke query kill (" + context.getParagraphId() + ") " + qh); + CommandResult res = shell.executeCommand("query kill " + qh); + s_logger.info("query kill returned (" + context.getParagraphId() + ") " + qh + + " with: " + res.getResult()); + } catch (Exception e) { + s_logger.error("unable to kill query (" + + context.getParagraphId() + ") " + qh, e); + } finally { + try { + if (lensClient != null) { + closeLensClient(lensClient); + s_clientMap.remove(lensClient); + } + closeLensClient(s_paraToQH.get(context.getParagraphId()).getLensClient()); + closeShell(s_paraToQH.get(context.getParagraphId()).getShell()); + } catch (Exception e) { + // ignore + } + s_paraToQH.remove(context.getParagraphId()); + closeShell(shell); + } + } + + @Override + public FormType getFormType() { + return FormType.SIMPLE; + } + + @Override + public int getProgress(InterpreterContext context) { + if (s_paraToQH.containsKey(context.getParagraphId())) { + s_logger.info("number of items for which progress can be reported :" + s_paraToQH.size()); + s_logger.info("number of open lensclient :" + s_clientMap.size()); + Bootstrap bs = createBootstrap(); + JLineShell shell = getJLineShell(bs); + LensClient lensClient = null; + String qh = s_paraToQH.get(context.getParagraphId()).getQueryHandle(); + try { + s_logger.info("fetch query status for : (" + context.getParagraphId() + ") :" + qh); + lensClient = createAndSetLensClient(bs); + s_clientMap.put(lensClient, true); + CommandResult res = shell.executeCommand("query status " + qh); + s_logger.info(context.getParagraphId() + " --> " + res.getResult().toString()); + //change to debug + Pattern pattern = Pattern.compile(".*(Progress : (\\d\\.\\d)).*"); + Matcher matcher = pattern.matcher(res.getResult().toString().replaceAll("\\n", " ")); + if (matcher.find(2)) { + Double d = Double.parseDouble(matcher.group(2)) * 100; + if (d.intValue() == 100) { + s_paraToQH.remove(context.getParagraphId()); + } + return d.intValue(); + } else { + return 1; + } + } + catch (Exception e) { + s_logger.error("unable to get progress for (" + context.getParagraphId() + ") :" + qh, e); + s_paraToQH.remove(context.getParagraphId()); + return 0; + } finally { + if (lensClient != null) { + closeLensClient(lensClient); + s_clientMap.remove(lensClient); + } + if (shell != null) { + closeShell(shell); + } + } + } + return 0; + } + + @Override + public List<String> completion(String buf, int cursor) { + return null; + } + + public boolean concurrentRequests() { + return Boolean.parseBoolean(getProperty(ZEPPELIN_LENS_RUN_CONCURRENT_SESSION)); + } + @Override + public Scheduler getScheduler() { + if (concurrentRequests()) { + return SchedulerFactory.singleton().createOrGetParallelScheduler( + LensInterpreter.class.getName() + this.hashCode(), m_maxThreads); + } else { + return super.getScheduler(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/1bc5e8df/lens/src/main/java/org/apache/zeppelin/lens/LensJLineShellComponent.java ---------------------------------------------------------------------- diff --git a/lens/src/main/java/org/apache/zeppelin/lens/LensJLineShellComponent.java b/lens/src/main/java/org/apache/zeppelin/lens/LensJLineShellComponent.java new file mode 100644 index 0000000..f025d53 --- /dev/null +++ b/lens/src/main/java/org/apache/zeppelin/lens/LensJLineShellComponent.java @@ -0,0 +1,244 @@ +/* + * Copyright 2011-2012 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.lens; + +import java.util.Map; + +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.BeanFactoryUtils; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.context.SmartLifecycle; +import org.springframework.shell.CommandLine; +import org.springframework.shell.plugin.BannerProvider; +import org.springframework.shell.plugin.HistoryFileNameProvider; +import org.springframework.shell.plugin.PluginUtils; +import org.springframework.shell.plugin.PromptProvider; +import org.springframework.shell.core.*; + +/** + * workaround for https://github.com/spring-projects/spring-shell/issues/73 + */ +public class LensJLineShellComponent extends JLineShell + implements SmartLifecycle, ApplicationContextAware, InitializingBean { + + @Autowired + private CommandLine commandLine; + + private volatile boolean running = false; + private Thread shellThread; + + private ApplicationContext applicationContext; + private boolean printBanner = true; + + private String historyFileName; + private String promptText; + private String productName; + private String banner; + private String version; + private String welcomeMessage; + + private ExecutionStrategy executionStrategy = new LensSimpleExecutionStrategy(); + private SimpleParser parser = new SimpleParser(); + + public SimpleParser getSimpleParser() { + return parser; + } + + public boolean isAutoStartup() { + return false; + } + + public void stop(Runnable callback) { + stop(); + callback.run(); + } + + public int getPhase() { + return 1; + } + + public void start() { + //customizePlug must run before start thread to take plugin's configuration into effect + customizePlugin(); + shellThread = new Thread(this, "Spring Shell"); + shellThread.start(); + running = true; + } + + + public void stop() { + if (running) { + closeShell(); + running = false; + } + } + + public boolean isRunning() { + return running; + } + + @SuppressWarnings("rawtypes") + public void afterPropertiesSet() { + + Map<String, CommandMarker> commands = + BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, + CommandMarker.class); + for (CommandMarker command : commands.values()) { + getSimpleParser().add(command); + } + + Map<String, Converter> converters = BeanFactoryUtils + .beansOfTypeIncludingAncestors(applicationContext, Converter.class); + for (Converter<?> converter : converters.values()) { + getSimpleParser().add(converter); + } + + setHistorySize(commandLine.getHistorySize()); + if (commandLine.getShellCommandsToExecute() != null) { + setPrintBanner(false); + } + } + + /** + * wait the shell command to complete by typing "quit" or "exit" + * + */ + public void waitForComplete() { + try { + shellThread.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + protected ExecutionStrategy getExecutionStrategy() { + return executionStrategy; + } + + @Override + protected Parser getParser() { + return parser; + } + + @Override + public String getStartupNotifications() { + return null; + } + + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + } + + public void customizePlugin() { + this.historyFileName = getHistoryFileName(); + this.promptText = getPromptText(); + String[] banner = getBannerText(); + this.banner = banner[0]; + this.welcomeMessage = banner[1]; + this.version = banner[2]; + this.productName = banner[3]; + } + + /** + * get history file name from provider. The provider has highest order + * <link>org.springframework.core.Ordered.getOder</link> will win. + * + * @return history file name + */ + protected String getHistoryFileName() { + HistoryFileNameProvider historyFileNameProvider = PluginUtils + .getHighestPriorityProvider(this.applicationContext, HistoryFileNameProvider.class); + String providerHistoryFileName = historyFileNameProvider.getHistoryFileName(); + if (providerHistoryFileName != null) { + return providerHistoryFileName; + } else { + return historyFileName; + } + } + + /** + * get prompt text from provider. The provider has highest order + * <link>org.springframework.core.Ordered.getOder</link> will win. + * + * @return prompt text + */ + protected String getPromptText() { + PromptProvider promptProvider = PluginUtils + .getHighestPriorityProvider(this.applicationContext, PromptProvider.class); + String providerPromptText = promptProvider.getPrompt(); + if (providerPromptText != null) { + return providerPromptText; + } else { + return promptText; + } + } + + /** + * Get Banner and Welcome Message from provider. The provider has highest order + * <link>org.springframework.core.Ordered.getOder</link> will win. + * @return BannerText[0]: Banner + * BannerText[1]: Welcome Message + * BannerText[2]: Version + * BannerText[3]: Product Name + */ + private String[] getBannerText() { + String[] bannerText = new String[4]; + BannerProvider provider = PluginUtils + .getHighestPriorityProvider(this.applicationContext, BannerProvider.class); + bannerText[0] = provider.getBanner(); + bannerText[1] = provider.getWelcomeMessage(); + bannerText[2] = provider.getVersion(); + bannerText[3] = provider.getProviderName(); + return bannerText; + } + + public void printBannerAndWelcome() { + if (printBanner) { + logger.info(this.banner); + logger.info(getWelcomeMessage()); + } + } + + + /** + * get the welcome message at start. + * + * @return welcome message + */ + public String getWelcomeMessage() { + return this.welcomeMessage; + } + + + /** + * @param printBanner the printBanner to set + */ + public void setPrintBanner(boolean printBanner) { + this.printBanner = printBanner; + } + + protected String getProductName() { + return productName; + } + + protected String getVersion() { + return version; + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/1bc5e8df/lens/src/main/java/org/apache/zeppelin/lens/LensSimpleExecutionStrategy.java ---------------------------------------------------------------------- diff --git a/lens/src/main/java/org/apache/zeppelin/lens/LensSimpleExecutionStrategy.java b/lens/src/main/java/org/apache/zeppelin/lens/LensSimpleExecutionStrategy.java new file mode 100644 index 0000000..e3294ad --- /dev/null +++ b/lens/src/main/java/org/apache/zeppelin/lens/LensSimpleExecutionStrategy.java @@ -0,0 +1,86 @@ +/* + * Copyright 2011-2012 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.lens; + +import org.springframework.shell.core.*; + +import java.util.logging.Logger; + +import org.springframework.shell.event.ParseResult; +import org.springframework.shell.support.logging.HandlerUtils; +import org.springframework.util.Assert; +import org.springframework.util.ReflectionUtils; + +/** + * workaround for https://github.com/spring-projects/spring-shell/issues/73 + */ +public class LensSimpleExecutionStrategy implements ExecutionStrategy { + + private static final Logger logger = HandlerUtils.getLogger(LensSimpleExecutionStrategy.class); + + public Object execute(ParseResult parseResult) throws RuntimeException { + Assert.notNull(parseResult, "Parse result required"); + logger.info("LensSimpleExecutionStrategy execute method invoked"); + synchronized (this) { + Assert.isTrue(isReadyForCommands(), "SimpleExecutionStrategy not yet ready for commands"); + Object target = parseResult.getInstance(); + if (target instanceof ExecutionProcessor) { + ExecutionProcessor processor = ((ExecutionProcessor) target); + parseResult = processor.beforeInvocation(parseResult); + try { + Object result = invoke(parseResult); + processor.afterReturningInvocation(parseResult, result); + return result; + } catch (Throwable th) { + processor.afterThrowingInvocation(parseResult, th); + return handleThrowable(th); + } + } + else { + return invoke(parseResult); + } + } + } + + private Object invoke(ParseResult parseResult) { + try { + return ReflectionUtils.invokeMethod(parseResult.getMethod(), + parseResult.getInstance(), parseResult.getArguments()); + } catch (Throwable th) { + logger.severe("Command failed " + th); + return handleThrowable(th); + } + } + + private Object handleThrowable(Throwable th) { + if (th instanceof Error) { + throw ((Error) th); + } + if (th instanceof RuntimeException) { + throw ((RuntimeException) th); + } + throw new RuntimeException(th); + } + + public boolean isReadyForCommands() { + return true; + } + + public void terminate() { + // do nothing + } + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/1bc5e8df/lens/src/test/java/org/apache/zeppelin/lens/LensInterpreterTest.java ---------------------------------------------------------------------- diff --git a/lens/src/test/java/org/apache/zeppelin/lens/LensInterpreterTest.java b/lens/src/test/java/org/apache/zeppelin/lens/LensInterpreterTest.java new file mode 100644 index 0000000..5af8deb --- /dev/null +++ b/lens/src/test/java/org/apache/zeppelin/lens/LensInterpreterTest.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.lens; + +import static org.junit.Assert.assertEquals; + +import java.util.Properties; + +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import static org.apache.zeppelin.lens.LensInterpreter.*; + +/** + * Lens interpreter unit tests + */ +public class LensInterpreterTest { + @Before + public void setUp() throws Exception { + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void test() { + Properties prop = new Properties(); + prop.setProperty(LENS_SERVER_URL, "http://127.0.0.1:9999/lensapi"); + prop.setProperty(LENS_CLIENT_DBNAME, "default"); + prop.setProperty(LENS_PERSIST_RESULTSET, "false"); + prop.setProperty(LENS_SESSION_CLUSTER_USER, "default"); + prop.setProperty(ZEPPELIN_MAX_ROWS, "1000"); + prop.setProperty(ZEPPELIN_LENS_RUN_CONCURRENT_SESSION, "true"); + prop.setProperty(ZEPPELIN_LENS_CONCURRENT_SESSIONS, "10"); + LensInterpreter t = new MockLensInterpreter(prop); + t.open(); + //simple help test + InterpreterResult result = t.interpret("help", null); + assertEquals(result.type(), InterpreterResult.Type.TEXT); + //assertEquals("unable to find 'query execute' in help message", + // result.message().contains("query execute"), result.message()); + t.close(); + } + + class MockLensInterpreter extends LensInterpreter { + public MockLensInterpreter(Properties property) { + super(property); + } + @Override + public void open() { + super.init(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/1bc5e8df/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 540e4d3..8b4762e 100644 --- a/pom.xml +++ b/pom.xml @@ -94,6 +94,7 @@ <module>tajo</module> <module>flink</module> <module>ignite</module> + <module>lens</module> <module>zeppelin-web</module> <module>zeppelin-server</module> <module>zeppelin-distribution</module> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/1bc5e8df/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 9f05742..ecdefd4 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -399,7 +399,8 @@ public class ZeppelinConfiguration extends XMLConfiguration { + "org.apache.zeppelin.tajo.TajoInterpreter," + "org.apache.zeppelin.flink.FlinkInterpreter," + "org.apache.zeppelin.ignite.IgniteInterpreter," - + "org.apache.zeppelin.ignite.IgniteSqlInterpreter"), + + "org.apache.zeppelin.ignite.IgniteSqlInterpreter," + + "org.apache.zeppelin.lens.LensInterpreter"), ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"), ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000), ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
