Repository: incubator-zeppelin Updated Branches: refs/heads/master 9186b2186 -> f04425bc9
ZEPPELIN-189: Add Apache Geode Interpreter for Zeppelin Syntax look like this: %geode.oql SELECT * FROM /regionOne o WHERE o.field1 > 146 AND o.field2 LIKE '%some%'; Author: tzolov <[email protected]> Closes #173 from tzolov/ZEPPELIN-189 and squashes the following commits: c5eef90 [tzolov] ZEPPELIN-189: Close previous connection before opening new one ab404e3 [tzolov] ZEPPELIN-189: Add GeodeOqlInterpreter to ZeppelinConfiguration's default interpreters list 0ff81c9 [tzolov] ZEPPELIN-189: Add Max number of OQL result to display 437c091 [tzolov] ZEPPELIN-189: Improve javadoc documentation 9b701f9 [tzolov] ZEPPELIN-189: Clean pom formatting and remove obsolete dependencies c410e2e [tzolov] ZEPPELIN-189: Fix wrong interpreter name in pom cd6294b [tzolov] ZEPPELIN-189: Handle responses containing reserved characters 14b8a37 [tzolov] ZEPPELIN-189: Add missing license tag ef7defc [tzolov] ZEPPELIN-189: Add Apache Geode Interpreter for Zeppelin Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/f04425bc Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/f04425bc Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/f04425bc Branch: refs/heads/master Commit: f04425bc99866c7386533103c403adcfafb08ea9 Parents: 9186b21 Author: tzolov <[email protected]> Authored: Thu Aug 6 08:45:17 2015 +0200 Committer: Alexander Bezzubov <[email protected]> Committed: Fri Aug 7 08:20:21 2015 +0900 ---------------------------------------------------------------------- conf/zeppelin-site.xml.template | 2 +- geode/pom.xml | 151 +++++++++ .../zeppelin/geode/GeodeOqlInterpreter.java | 319 +++++++++++++++++++ .../zeppelin/geode/GeodeOqlInterpreterTest.java | 181 +++++++++++ pom.xml | 1 + .../zeppelin/conf/ZeppelinConfiguration.java | 3 +- 6 files changed, 655 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f04425bc/conf/zeppelin-site.xml.template ---------------------------------------------------------------------- diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index 13e4d1d..f48a960 100644 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -72,7 +72,7 @@ <property> <name>zeppelin.interpreters</name> - <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter</value> + <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter</value> <description>Comma separated interpreter configurations. First interpreter become a default</description> </property> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f04425bc/geode/pom.xml ---------------------------------------------------------------------- diff --git a/geode/pom.xml b/geode/pom.xml new file mode 100644 index 0000000..3fa7cb3 --- /dev/null +++ b/geode/pom.xml @@ -0,0 +1,151 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>zeppelin</artifactId> + <groupId>org.apache.zeppelin</groupId> + <version>0.6.0-incubating-SNAPSHOT</version> + </parent> + + <groupId>org.apache.zeppelin</groupId> + <artifactId>zeppelin-geode</artifactId> + <packaging>jar</packaging> + <version>0.6.0-incubating-SNAPSHOT</version> + <name>Zeppelin: Apache Geode interpreter</name> + <url>http://geode.incubator.apache.org/</url> + + <properties> + <geode.version>1.0.0-incubating-SNAPSHOT</geode.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.zeppelin</groupId> + <artifactId>zeppelin-interpreter</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.geode</groupId> + <artifactId>gemfire-core</artifactId> + <version>${geode.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-exec</artifactId> + <version>1.1</version> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>1.9.5</version> + <scope>test</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-deploy-plugin</artifactId> + <version>2.7</version> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + + <plugin> + <artifactId>maven-enforcer-plugin</artifactId> + <version>1.3.1</version> + <executions> + <execution> + <id>enforce</id> + <phase>none</phase> + </execution> + </executions> + </plugin> + + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + <version>2.8</version> + <executions> + <execution> + <id>copy-dependencies</id> + <phase>package</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/../../interpreter/geode</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>false</overWriteSnapshots> + <overWriteIfNewer>true</overWriteIfNewer> + <includeScope>runtime</includeScope> + </configuration> + </execution> + <execution> + <id>copy-artifact</id> + <phase>package</phase> + <goals> + <goal>copy</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/../../interpreter/geode</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>false</overWriteSnapshots> + <overWriteIfNewer>true</overWriteIfNewer> + <includeScope>runtime</includeScope> + <artifactItems> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>${project.artifactId}</artifactId> + <version>${project.version}</version> + <type>${project.packaging}</type> + </artifactItem> + </artifactItems> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f04425bc/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java ---------------------------------------------------------------------- diff --git a/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java b/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java new file mode 100644 index 0000000..6f6b440 --- /dev/null +++ b/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java @@ -0,0 +1,319 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.zeppelin.geode; + +import java.util.Iterator; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.lang.StringUtils; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.gemstone.gemfire.cache.client.ClientCache; +import com.gemstone.gemfire.cache.client.ClientCacheFactory; +import com.gemstone.gemfire.cache.query.QueryService; +import com.gemstone.gemfire.cache.query.SelectResults; +import com.gemstone.gemfire.cache.query.Struct; +import com.gemstone.gemfire.pdx.PdxInstance; + +/** + * Apache Geode OQL Interpreter (http://geode.incubator.apache.org) + * + * <ul> + * <li>{@code geode.locator.host} - The Geode Locator {@code <HOST>} to connect to.</li> + * <li>{@code geode.locator.port} - The Geode Locator {@code <PORT>} to connect to.</li> + * <li>{@code geode.max.result} - Max number of OQL result to display.</li> + * </ul> + * <p> + * Sample usages: <br/> + * {@code %geode.oql} <br/> + * {@code SELECT * FROM /regionEmployee e WHERE e.companyId > 95} <br/> + * {@code SELECT * FROM /regionEmployee ORDER BY employeeId} <br/> + * {@code + * SELECT * FROM /regionEmployee + * WHERE companyId IN SET(1, 3, 7) OR lastName IN SET('NameA', 'NameB') + * } <br/> + * {@code + * SELECT e.employeeId, c.id as companyId FROM /regionEmployee e, /regionCompany c + * WHERE e.companyId = c.id + * } + * </p> + * <p> + * OQL specification and sample queries: + * http://geode-docs.cfapps.io/docs/getting_started/querying_quick_reference.html + * </p> + * <p> + * When the Zeppelin server is collocated with Geode Shell (gfsh) one can use the %sh interpreter to + * run Geode shell commands: <br/> + * {@code + * %sh + * source /etc/geode/conf/geode-env.sh + * gfsh << EOF + * connect --locator=ambari.localdomain[10334] + * destroy region --name=/regionEmployee + * create region --name=regionEmployee --type=REPLICATE + * exit; + * EOF + *} + * </p> + * <p> + * Known issue:http://gemfire.docs.pivotal.io/bugnotes/KnownIssuesGemFire810.html #43673 Using query + * "select * from /exampleRegion.entrySet" fails in a client-server topology and/or in a + * PartitionedRegion. + * </p> + */ +public class GeodeOqlInterpreter extends Interpreter { + + private Logger logger = LoggerFactory.getLogger(GeodeOqlInterpreter.class); + + public static final String DEFAULT_PORT = "10334"; + public static final String DEFAULT_HOST = "localhost"; + public static final String DEFAULT_MAX_RESULT = "1000"; + + private static final char NEWLINE = '\n'; + private static final char TAB = '\t'; + private static final char WHITESPACE = ' '; + + private static final String TABLE_MAGIC_TAG = "%table "; + + public static final String LOCATOR_HOST = "geode.locator.host"; + public static final String LOCATOR_PORT = "geode.locator.port"; + public static final String MAX_RESULT = "geode.max.result"; + + static { + Interpreter.register( + "oql", + "geode", + GeodeOqlInterpreter.class.getName(), + new InterpreterPropertyBuilder().add(LOCATOR_HOST, DEFAULT_HOST, "The Geode Locator Host.") + .add(LOCATOR_PORT, DEFAULT_PORT, "The Geode Locator Port") + .add(MAX_RESULT, DEFAULT_MAX_RESULT, "Max number of OQL result to display.").build()); + } + + private ClientCache clientCache = null; + private QueryService queryService = null; + private Exception exceptionOnConnect; + private int maxResult; + + public GeodeOqlInterpreter(Properties property) { + super(property); + } + + protected ClientCache getClientCache() { + + String locatorHost = getProperty(LOCATOR_HOST); + int locatorPort = Integer.valueOf(getProperty(LOCATOR_PORT)); + + ClientCache clientCache = + new ClientCacheFactory().addPoolLocator(locatorHost, locatorPort).create(); + + return clientCache; + } + + @Override + public void open() { + logger.info("Geode open connection called!"); + + // Close the previous open connections. + close(); + + try { + maxResult = Integer.valueOf(getProperty(MAX_RESULT)); + + clientCache = getClientCache(); + queryService = clientCache.getQueryService(); + + exceptionOnConnect = null; + logger.info("Successfully created Geode connection"); + } catch (Exception e) { + logger.error("Cannot open connection", e); + exceptionOnConnect = e; + } + } + + @Override + public void close() { + try { + if (clientCache != null) { + clientCache.close(); + } + + if (queryService != null) { + queryService.closeCqs(); + } + + } catch (Exception e) { + logger.error("Cannot close connection", e); + } finally { + clientCache = null; + queryService = null; + exceptionOnConnect = null; + } + } + + private InterpreterResult executeOql(String oql) { + try { + + if (getExceptionOnConnect() != null) { + return new InterpreterResult(Code.ERROR, getExceptionOnConnect().getMessage()); + } + + @SuppressWarnings("unchecked") + SelectResults<Object> results = + (SelectResults<Object>) getQueryService().newQuery(oql).execute(); + + StringBuilder msg = new StringBuilder(TABLE_MAGIC_TAG); + boolean isTableHeaderSet = false; + + Iterator<Object> iterator = results.iterator(); + int rowDisplayCount = 0; + + while (iterator.hasNext() && (rowDisplayCount < getMaxResult())) { + + Object entry = iterator.next(); + rowDisplayCount++; + + if (entry instanceof Number) { + handleNumberEntry(isTableHeaderSet, entry, msg); + } else if (entry instanceof Struct) { + handleStructEntry(isTableHeaderSet, entry, msg); + } else if (entry instanceof PdxInstance) { + handlePdxInstanceEntry(isTableHeaderSet, entry, msg); + } else { + handleUnsupportedTypeEntry(isTableHeaderSet, entry, msg); + } + + isTableHeaderSet = true; + msg.append(NEWLINE); + } + + return new InterpreterResult(Code.SUCCESS, msg.toString()); + + } catch (Exception ex) { + logger.error("Cannot run " + oql, ex); + return new InterpreterResult(Code.ERROR, ex.getMessage()); + } + } + + /** + * Zeppelin's %TABLE convention uses tab (\t) to delimit fields and new-line (\n) to delimit rows + * To complain with this convention we need to replace any occurrences of tab and/or newline + * characters in the content. + */ + private String replaceReservedChars(String str) { + + if (StringUtils.isBlank(str)) { + return str; + } + + return str.replace(TAB, WHITESPACE).replace(NEWLINE, WHITESPACE); + } + + private void handleStructEntry(boolean isHeaderSet, Object entry, StringBuilder msg) { + Struct struct = (Struct) entry; + if (!isHeaderSet) { + for (String titleName : struct.getStructType().getFieldNames()) { + msg.append(replaceReservedChars(titleName)).append(TAB); + } + msg.append(NEWLINE); + } + + for (String titleName : struct.getStructType().getFieldNames()) { + msg.append(replaceReservedChars("" + struct.get(titleName))).append(TAB); + } + } + + private void handlePdxInstanceEntry(boolean isHeaderSet, Object entry, StringBuilder msg) { + PdxInstance pdxEntry = (PdxInstance) entry; + if (!isHeaderSet) { + for (String titleName : pdxEntry.getFieldNames()) { + msg.append(replaceReservedChars(titleName)).append(TAB); + } + msg.append(NEWLINE); + } + + for (String titleName : pdxEntry.getFieldNames()) { + msg.append(replaceReservedChars("" + pdxEntry.getField(titleName))).append(TAB); + } + } + + private void handleNumberEntry(boolean isHeaderSet, Object entry, StringBuilder msg) { + if (!isHeaderSet) { + msg.append("Result").append(NEWLINE); + } + msg.append((Number) entry); + } + + private void handleUnsupportedTypeEntry(boolean isHeaderSet, Object entry, StringBuilder msg) { + if (!isHeaderSet) { + msg.append("Unsuppoted Type").append(NEWLINE); + } + msg.append("" + entry); + } + + + @Override + public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) { + logger.info("Run OQL command '{}'", cmd); + return executeOql(cmd); + } + + @Override + public void cancel(InterpreterContext context) { + // Do nothing + } + + @Override + public FormType getFormType() { + return FormType.SIMPLE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public Scheduler getScheduler() { + return SchedulerFactory.singleton().createOrGetFIFOScheduler( + GeodeOqlInterpreter.class.getName() + this.hashCode()); + } + + @Override + public List<String> completion(String buf, int cursor) { + return null; + } + + public int getMaxResult() { + return maxResult; + } + + // Test only + QueryService getQueryService() { + return this.queryService; + } + + Exception getExceptionOnConnect() { + return this.exceptionOnConnect; + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f04425bc/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java ---------------------------------------------------------------------- diff --git a/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java b/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java new file mode 100644 index 0000000..78755eb --- /dev/null +++ b/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.zeppelin.geode; + +import static org.apache.zeppelin.geode.GeodeOqlInterpreter.*; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.Properties; + +import org.apache.zeppelin.interpreter.Interpreter.FormType; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.junit.Test; + +import com.gemstone.gemfire.cache.query.QueryService; +import com.gemstone.gemfire.cache.query.SelectResults; +import com.gemstone.gemfire.cache.query.Struct; +import com.gemstone.gemfire.cache.query.internal.StructImpl; +import com.gemstone.gemfire.cache.query.internal.types.StructTypeImpl; +import com.gemstone.gemfire.pdx.PdxInstance; +import com.gemstone.gemfire.pdx.internal.PdxInstanceImpl; +import com.gemstone.gemfire.pdx.internal.PdxType; + +public class GeodeOqlInterpreterTest { + + private static final String OQL_QUERY = "select * from /region"; + + private static Iterator<Object> asIterator(Object... items) { + return new ArrayList<Object>(Arrays.asList(items)).iterator(); + } + + @Test + public void testOpenCommandIndempotency() { + + Properties properties = new Properties(); + properties.put(LOCATOR_HOST, DEFAULT_HOST); + properties.put(LOCATOR_PORT, DEFAULT_PORT); + properties.put(MAX_RESULT, DEFAULT_MAX_RESULT); + + GeodeOqlInterpreter spyGeodeOqlInterpreter = spy(new GeodeOqlInterpreter(properties)); + + // Ensure that an attempt to open new connection will clean any remaining connections + spyGeodeOqlInterpreter.open(); + spyGeodeOqlInterpreter.open(); + spyGeodeOqlInterpreter.open(); + + verify(spyGeodeOqlInterpreter, times(3)).open(); + verify(spyGeodeOqlInterpreter, times(3)).close(); + } + + @Test + public void oqlNumberResponse() throws Exception { + testOql(asIterator(66, 67), "Result\n66\n67\n", 10); + testOql(asIterator(66, 67), "Result\n66\n", 1); + } + + @Test + public void oqlStructResponse() throws Exception { + String[] fields = new String[] {"field1", "field2"}; + Struct s1 = new StructImpl(new StructTypeImpl(fields), new String[] {"val11", "val12"}); + Struct s2 = new StructImpl(new StructTypeImpl(fields), new String[] {"val21", "val22"}); + + testOql(asIterator(s1, s2), "field1\tfield2\t\nval11\tval12\t\nval21\tval22\t\n", 10); + testOql(asIterator(s1, s2), "field1\tfield2\t\nval11\tval12\t\n", 1); + } + + @Test + public void oqlStructResponseWithReservedCharacters() throws Exception { + String[] fields = new String[] {"fi\teld1", "f\nield2"}; + Struct s1 = new StructImpl(new StructTypeImpl(fields), new String[] {"v\nal\t1", "val2"}); + + testOql(asIterator(s1), "fi eld1\tf ield2\t\nv al 1\tval2\t\n", 10); + } + + @Test + public void oqlPdxInstanceResponse() throws Exception { + ByteArrayInputStream bais = new ByteArrayInputStream("koza\tboza\n".getBytes()); + PdxInstance pdx1 = new PdxInstanceImpl(new PdxType(), new DataInputStream(bais), 4); + PdxInstance pdx2 = new PdxInstanceImpl(new PdxType(), new DataInputStream(bais), 4); + + testOql(asIterator(pdx1, pdx2), "\n\n\n", 10); + testOql(asIterator(pdx1, pdx2), "\n\n", 1); + } + + private static class DummyUnspportedType { + @Override + public String toString() { + return "Unsupported Indeed"; + } + } + + @Test + public void oqlUnsupportedTypeResponse() throws Exception { + DummyUnspportedType unspported1 = new DummyUnspportedType(); + DummyUnspportedType unspported2 = new DummyUnspportedType(); + + testOql(asIterator(unspported1, unspported2), "Unsuppoted Type\n" + unspported1.toString() + + "\n" + unspported1.toString() + "\n", 10); + } + + private void testOql(Iterator<Object> queryResponseIterator, String expectedOutput, int maxResult) + throws Exception { + + GeodeOqlInterpreter spyGeodeOqlInterpreter = spy(new GeodeOqlInterpreter(new Properties())); + + QueryService mockQueryService = mock(QueryService.class, RETURNS_DEEP_STUBS); + + when(spyGeodeOqlInterpreter.getQueryService()).thenReturn(mockQueryService); + when(spyGeodeOqlInterpreter.getMaxResult()).thenReturn(maxResult); + + @SuppressWarnings("unchecked") + SelectResults<Object> mockResults = mock(SelectResults.class); + + when(mockQueryService.newQuery(eq(OQL_QUERY)).execute()).thenReturn(mockResults); + + when(mockResults.iterator()).thenReturn(queryResponseIterator); + + InterpreterResult interpreterResult = spyGeodeOqlInterpreter.interpret(OQL_QUERY, null); + + assertEquals(Code.SUCCESS, interpreterResult.code()); + assertEquals(expectedOutput, interpreterResult.message()); + } + + @Test + public void oqlWithQueryException() throws Exception { + + GeodeOqlInterpreter spyGeodeOqlInterpreter = spy(new GeodeOqlInterpreter(new Properties())); + + when(spyGeodeOqlInterpreter.getExceptionOnConnect()).thenReturn( + new RuntimeException("Test Exception On Connect")); + + InterpreterResult interpreterResult = spyGeodeOqlInterpreter.interpret(OQL_QUERY, null); + + assertEquals(Code.ERROR, interpreterResult.code()); + assertEquals("Test Exception On Connect", interpreterResult.message()); + } + + @Test + public void oqlWithExceptionOnConnect() throws Exception { + + GeodeOqlInterpreter spyGeodeOqlInterpreter = spy(new GeodeOqlInterpreter(new Properties())); + + when(spyGeodeOqlInterpreter.getQueryService()).thenThrow( + new RuntimeException("Expected Test Exception!")); + + InterpreterResult interpreterResult = spyGeodeOqlInterpreter.interpret(OQL_QUERY, null); + + assertEquals(Code.ERROR, interpreterResult.code()); + assertEquals("Expected Test Exception!", interpreterResult.message()); + } + + @Test + public void testFormType() { + assertEquals(FormType.SIMPLE, new GeodeOqlInterpreter(new Properties()).getFormType()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f04425bc/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 9e5f54e..ecdebdd 100644 --- a/pom.xml +++ b/pom.xml @@ -91,6 +91,7 @@ <module>angular</module> <module>shell</module> <module>hive</module> + <module>geode</module> <module>tajo</module> <module>flink</module> <module>ignite</module> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f04425bc/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 223dc70..e25d3c0 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -394,7 +394,8 @@ public class ZeppelinConfiguration extends XMLConfiguration { + "org.apache.zeppelin.ignite.IgniteInterpreter," + "org.apache.zeppelin.ignite.IgniteSqlInterpreter," + "org.apache.zeppelin.lens.LensInterpreter," - + "org.apache.zeppelin.cassandra.CassandraInterpreter"), + + "org.apache.zeppelin.cassandra.CassandraInterpreter," + + "org.apache.zeppelin.geode.GeodeOqlInterpreter"), ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"), ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000), ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
