Repository: zeppelin Updated Branches: refs/heads/master 92f244ef7 -> 794e18971
[ZEPPELIN-3654] - New Hazelcast Jet interpreter ### What is this PR for? **New Hazelcast Jet interpreter** Zeppelin has interpreters for different data processing systems like Flink, Spark, Kylin, Ignite, Geode, Beam, etc. Hazelcast Jet is a general purpose distributed data processing engine, built on top of Hazelcast for stream/batch processing, comparable if not better in terms of performance to the engine supported by Zeppelin therefore a perfect candidate for a Zeppelin interpreter. Part of the interpreter is to have a set of utility methods that print out Hazelcast data structures (IMap and ICache) and leverage Zeppelin's built in visualization (%table). What's more, a nice addition is to have the Hazelcast Jet DAG of the pipeline displayed as a network graph using %network display system. ### What type of PR is it? * Feature ### Todos * Utility method to display Hazelcast Jet DAG using %network * Review zeppelin-distribution/src/bin_license/LICENSE file * Any feedback from reviewers ### What is the Jira issue? * [ZEPPELIN-3654] ### How should this be tested? Manually * Start the Zeppelin server * Create a new note with the hazelcastjet interpreter binding * Write an Hazelcast Jet job as per documentation (docs/interpreter/hazelcastjet.md) Unit tests * Run unit tests (HazelcastJetInterpreterTest.java) ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? Yes, added Apache License for Hazelcast Jet. * Is there breaking changes for older versions? No. * Does this needs documentation? Yes, it has been added to the PR, see docs/interpreter/hazelcastjet.md. Author: Vincenzo Selvaggio <vselvag...@hotmail.it> Closes #3157 from selvinsource/hazelcast-jet-interpreter and squashes the following commits: 23a95669f [Vincenzo Selvaggio] Updated hazelcast jet interpreter pom.xml based on new structure. 684a0c1a4 [Vincenzo Selvaggio] Updated hazelcast jet documentation with more details on the graph. 83377ecac [Vincenzo Selvaggio] Added HazelcastJet interpreter utils. da6941db6 [Vincenzo Selvaggio] Resolved .travis.yml conflict by adding !hazelcastjet. 95642b65f [Vincenzo Selvaggio] Documentation example updated. Some minor fixes. Added LICENSE for Hazelcast Jet. 5f4e3c4c5 [Vincenzo Selvaggio] Hazelcast Jet Interpreter folder restructure. 36b8a624c [Vincenzo Selvaggio] Hazelcast Jet Interpreter first commit! Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/794e1897 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/794e1897 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/794e1897 Branch: refs/heads/master Commit: 794e1897183a1dc811b73a9d160d064fd85188c2 Parents: 92f244e Author: Vincenzo Selvaggio <vselvag...@hotmail.it> Authored: Wed Sep 19 22:39:08 2018 +0100 Committer: Jeff Zhang <zjf...@apache.org> Committed: Wed Sep 26 17:38:02 2018 +0800 ---------------------------------------------------------------------- .travis.yml | 2 +- conf/interpreter-list | 1 + docs/_includes/themes/zeppelin/_navigation.html | 1 + docs/interpreter/hazelcastjet.md | 143 +++++++++++++++++++ hazelcastjet/README.md | 19 +++ hazelcastjet/pom.xml | 96 +++++++++++++ .../hazelcastjet/HazelcastJetInterpreter.java | 33 +++++ .../HazelcastJetInterpreterUtils.java | 131 +++++++++++++++++ .../src/main/resources/interpreter-setting.json | 14 ++ .../HazelcastJetInterpreterTest.java | 99 +++++++++++++ .../HazelcastJetInterpreterUtilsTest.java | 135 +++++++++++++++++ pom.xml | 1 + zeppelin-distribution/src/bin_license/LICENSE | 1 + 13 files changed, 675 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/794e1897/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index 43d3a20..a86cf34 100644 --- a/.travis.yml +++ b/.travis.yml @@ -41,7 +41,7 @@ addons: env: global: # Interpreters does not required by zeppelin-server integration tests - - INTERPRETERS='!beam,!hbase,!pig,!jdbc,!file,!ignite,!kylin,!lens,!cassandra,!elasticsearch,!bigquery,!alluxio,!scio,!livy,!groovy,!sap,!java,!geode,!neo4j' + - INTERPRETERS='!beam,!hbase,!pig,!jdbc,!file,!ignite,!kylin,!lens,!cassandra,!elasticsearch,!bigquery,!alluxio,!scio,!livy,!groovy,!sap,!java,!geode,!neo4j,!hazelcastjet' matrix: include: http://git-wip-us.apache.org/repos/asf/zeppelin/blob/794e1897/conf/interpreter-list ---------------------------------------------------------------------- diff --git a/conf/interpreter-list b/conf/interpreter-list index 160e61f..7c746d7 100644 --- a/conf/interpreter-list +++ b/conf/interpreter-list @@ -25,6 +25,7 @@ cassandra org.apache.zeppelin:zeppelin-cassandra_2.11:0.9.0 Cassandr elasticsearch org.apache.zeppelin:zeppelin-elasticsearch:0.9.0 Elasticsearch interpreter file org.apache.zeppelin:zeppelin-file:0.9.0 HDFS file interpreter flink org.apache.zeppelin:zeppelin-flink_2.11:0.9.0 Flink interpreter built with Scala 2.11 +hazelcastjet org.apache.zeppelin:zeppelin-hazelcastjet:0.9.0 Hazelcast Jet interpreter hbase org.apache.zeppelin:zeppelin-hbase:0.9.0 Hbase interpreter ignite org.apache.zeppelin:zeppelin-ignite_2.11:0.9.0 Ignite interpreter built with Scala 2.11 jdbc org.apache.zeppelin:zeppelin-jdbc:0.9.0 Jdbc interpreter http://git-wip-us.apache.org/repos/asf/zeppelin/blob/794e1897/docs/_includes/themes/zeppelin/_navigation.html ---------------------------------------------------------------------- diff --git a/docs/_includes/themes/zeppelin/_navigation.html b/docs/_includes/themes/zeppelin/_navigation.html index 6d2e0c9..d5f292d 100644 --- a/docs/_includes/themes/zeppelin/_navigation.html +++ b/docs/_includes/themes/zeppelin/_navigation.html @@ -132,6 +132,7 @@ <li><a href="{{BASE_PATH}}/interpreter/flink.html">Flink</a></li> <li><a href="{{BASE_PATH}}/interpreter/geode.html">Geode</a></li> <li><a href="{{BASE_PATH}}/interpreter/groovy.html">Groovy</a></li> + <li><a href="{{BASE_PATH}}/interpreter/hazelcastjet.html">Hazelcast Jet</a></li> <li><a href="{{BASE_PATH}}/interpreter/hbase.html">HBase</a></li> <li><a href="{{BASE_PATH}}/interpreter/hdfs.html">HDFS</a></li> <li><a href="{{BASE_PATH}}/interpreter/hive.html">Hive</a></li> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/794e1897/docs/interpreter/hazelcastjet.md ---------------------------------------------------------------------- diff --git a/docs/interpreter/hazelcastjet.md b/docs/interpreter/hazelcastjet.md new file mode 100644 index 0000000..06ebc88 --- /dev/null +++ b/docs/interpreter/hazelcastjet.md @@ -0,0 +1,143 @@ +--- +layout: page +title: Hazelcast Jet interpreter in Apache Zeppelin +description: Build and execture Hazelcast Jet computation jobs. +group: interpreter +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +{% include JB/setup %} + +# Hazelcast Jet interpreter for Apache Zeppelin + +<div id="toc"></div> + +## Overview +[Hazelcast Jet](https://jet.hazelcast.org) is an open source application embeddable, distributed computing engine for In-Memory Streaming and Fast Batch Processing built on top of Hazelcast In-Memory Data Grid (IMDG). +With Hazelcast IMDG providing storage functionality, Hazelcast Jet performs parallel execution to enable data-intensive applications to operate in near real-time. + +## Why Hazelcast Jet? +There are plenty of solutions which can solve some of these issues, so why choose Hazelcast Jet? +When speed and simplicity is important. + +Hazelcast Jet gives you all the infrastructure you need to build a distributed data processing pipeline within one 10Mb Java JAR: processing, storage and clustering. + +As it is built on top of Hazelcast IMDG, Hazelcast Jet comes with in-memory operational storage thatâs available out-of-the box. This storage is partitioned, distributed and replicated across the Hazelcast Jet cluster for capacity and resiliency. It can be used as an input data buffer, to publish the results of a Hazelcast Jet computation, to connect multiple Hazelcast Jet jobs or as a lookup cache for data enrichment. + +## How to use the Hazelcast Jet interpreter +Basically, you can write normal java code. You should write the main method inside a class because the interpreter invoke this main to execute the code. Unlike Zeppelin normal pattern, each paragraph is considered as a separate job, there isn't any relation to any other paragraph. For example, a variable defined in one paragraph cannot be used in another one as each paragraph is a self contained java main class that is executed and the output returned to Zeppelin. + +The following is a demonstration of a word count example with the result represented as an Hazelcast IMDG IMap sink and displayed leveraging Zeppelin's built in visualization using the utility method `JavaInterpreterUtils.displayTableFromSimpleMap`. + +```java +%hazelcastjet + +import com.hazelcast.jet.Jet; +import com.hazelcast.jet.JetInstance; +import com.hazelcast.jet.core.DAG; +import com.hazelcast.jet.pipeline.Pipeline; +import com.hazelcast.jet.pipeline.Sinks; +import com.hazelcast.jet.pipeline.Sources; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.zeppelin.java.JavaInterpreterUtils; + +import static com.hazelcast.jet.Traversers.traverseArray; +import static com.hazelcast.jet.aggregate.AggregateOperations.counting; +import static com.hazelcast.jet.function.DistributedFunctions.wholeItem; + +public class DisplayTableFromSimpleMapExample { + + public static void main(String[] args) { + + // Create the specification of the computation pipeline. Note + // it's a pure POJO: no instance of Jet needed to create it. + Pipeline p = Pipeline.create(); + p.drawFrom(Sources.<String>list("text")) + .flatMap(word -> + traverseArray(word.toLowerCase().split("\\W+"))) + .filter(word -> !word.isEmpty()) + .groupingKey(wholeItem()) + .aggregate(counting()) + .drainTo(Sinks.map("counts")); + + // Start Jet, populate the input list + JetInstance jet = Jet.newJetInstance(); + try { + List<String> text = jet.getList("text"); + text.add("hello world hello hello world"); + text.add("world world hello world"); + + // Perform the computation + jet.newJob(p).join(); + + // Diplay the results with Zeppelin %table + Map<String, Long> counts = jet.getMap("counts"); + System.out.println(JavaInterpreterUtils.displayTableFromSimpleMap("Word","Count", counts)); + + } finally { + Jet.shutdownAll(); + } + + } + +} +``` + +The following is a demonstration where the Hazelcast DAG (directed acyclic graph) is displayed as a graph leveraging Zeppelin's built in visualization using the utility method `HazelcastJetInterpreterUtils.displayNetworkFromDAG`. +This is particularly useful to understand how the high level Pipeline is then converted to the Jetâs low-level Core API. + +```java +%hazelcastjet + +import com.hazelcast.jet.pipeline.Pipeline; +import com.hazelcast.jet.pipeline.Sinks; +import com.hazelcast.jet.pipeline.Sources; + +import org.apache.zeppelin.hazelcastjet.HazelcastJetInterpreterUtils; + +import static com.hazelcast.jet.Traversers.traverseArray; +import static com.hazelcast.jet.aggregate.AggregateOperations.counting; +import static com.hazelcast.jet.function.DistributedFunctions.wholeItem; + +public class DisplayNetworkFromDAGExample { + + public static void main(String[] args) { + + // Create the specification of the computation pipeline. Note + // it's a pure POJO: no instance of Jet needed to create it. + Pipeline p = Pipeline.create(); + p.drawFrom(Sources.<String>list("text")) + .flatMap(word -> + traverseArray(word.toLowerCase().split("\\W+"))).setName("flat traversing") + .filter(word -> !word.isEmpty()) + .groupingKey(wholeItem()) + .aggregate(counting()) + .drainTo(Sinks.map("counts")); + + // Diplay the results with Zeppelin %network + System.out.println(HazelcastJetInterpreterUtils.displayNetworkFromDAG(p.toDag())); + + } + +} +``` + +Note +- By clicking on a node of the graph, the node type is displayed (either Source, Sink or Transform). This is also visually represented with colors (Sources and Sinks are blue, Transforms are orange). +- By clicking on an edge of the graph, the following details are shown: routing (UNICAST, PARTITIONED, ISOLATED, BROADCAST), distributed (true or false), priority (int). http://git-wip-us.apache.org/repos/asf/zeppelin/blob/794e1897/hazelcastjet/README.md ---------------------------------------------------------------------- diff --git a/hazelcastjet/README.md b/hazelcastjet/README.md new file mode 100644 index 0000000..a7c3800 --- /dev/null +++ b/hazelcastjet/README.md @@ -0,0 +1,19 @@ +# Overview +Hazelcast Jet interpreter for Apache Zeppelin + +# Architecture +Current interpreter implementation supports the static REPL. It compiles the code in memory, execute it and redirect the output to Zeppelin. + +### Technical overview + + * Upon starting an interpreter, an instance of `JavaCompiler` is created. + + * When the user runs commands with java, the `JavaParser` go through the code to get a class that contains the main method. + + * Then it replaces the class name with random class name to avoid overriding while compilation. It creates new out & err stream to get the data in new stream instead of the console, to redirect output to Zeppelin. + + * If there is any error during compilation, it can catch and redirect to Zeppelin. + + * `JavaInterpreterUtils` contains useful methods to print out Java collections and leverage Zeppelin's built in visualization. + + * `HazelcastJetInterpreterUtils` contains useful methods to print out Hazelcast specific classes (such as DAG) and leverage Zeppelin's built in visualization. http://git-wip-us.apache.org/repos/asf/zeppelin/blob/794e1897/hazelcastjet/pom.xml ---------------------------------------------------------------------- diff --git a/hazelcastjet/pom.xml b/hazelcastjet/pom.xml new file mode 100644 index 0000000..2ea8591 --- /dev/null +++ b/hazelcastjet/pom.xml @@ -0,0 +1,96 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>zeppelin-interpreter-parent</artifactId> + <groupId>org.apache.zeppelin</groupId> + <version>0.9.0-SNAPSHOT</version> + <relativePath>../zeppelin-interpreter-parent/pom.xml</relativePath> + </parent> + + <groupId>org.apache.zeppelin</groupId> + <artifactId>zeppelin-hazelcastjet</artifactId> + <packaging>jar</packaging> + <version>0.9.0-SNAPSHOT</version> + <name>Zeppelin: Hazelcast Jet interpreter</name> + + <properties> + <!--library versions--> + <interpreter.name>hazelcastjet</interpreter.name> + </properties> + + <dependencies> + + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>zeppelin-interpreter-api</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.zeppelin</groupId> + <artifactId>zeppelin-java</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>com.hazelcast.jet</groupId> + <artifactId>hazelcast-jet</artifactId> + <version>0.6.1</version> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-enforcer-plugin</artifactId> + </plugin> + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + </plugin> + <plugin> + <artifactId>maven-resources-plugin</artifactId> + </plugin> + <plugin> + <artifactId>maven-shade-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <configuration> + <skip>false</skip> + </configuration> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/794e1897/hazelcastjet/src/main/java/org/apache/zeppelin/hazelcastjet/HazelcastJetInterpreter.java ---------------------------------------------------------------------- diff --git a/hazelcastjet/src/main/java/org/apache/zeppelin/hazelcastjet/HazelcastJetInterpreter.java b/hazelcastjet/src/main/java/org/apache/zeppelin/hazelcastjet/HazelcastJetInterpreter.java new file mode 100644 index 0000000..00d2418 --- /dev/null +++ b/hazelcastjet/src/main/java/org/apache/zeppelin/hazelcastjet/HazelcastJetInterpreter.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.hazelcastjet; + +import org.apache.zeppelin.java.JavaInterpreter; + +import java.util.Properties; + +/** + * Hazelcast Jet interpreter + */ +public class HazelcastJetInterpreter extends JavaInterpreter { + + public HazelcastJetInterpreter(Properties property) { + super(property); + } + +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/794e1897/hazelcastjet/src/main/java/org/apache/zeppelin/hazelcastjet/HazelcastJetInterpreterUtils.java ---------------------------------------------------------------------- diff --git a/hazelcastjet/src/main/java/org/apache/zeppelin/hazelcastjet/HazelcastJetInterpreterUtils.java b/hazelcastjet/src/main/java/org/apache/zeppelin/hazelcastjet/HazelcastJetInterpreterUtils.java new file mode 100644 index 0000000..c8bcd6d --- /dev/null +++ b/hazelcastjet/src/main/java/org/apache/zeppelin/hazelcastjet/HazelcastJetInterpreterUtils.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.hazelcastjet; + +import com.google.gson.Gson; +import com.hazelcast.jet.core.DAG; +import org.apache.zeppelin.interpreter.graph.GraphResult; +import org.apache.zeppelin.tabledata.Node; +import org.apache.zeppelin.tabledata.Relationship; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Hazelcast Jet interpreter utility methods + */ +public class HazelcastJetInterpreterUtils { + + private static final Gson gson = new Gson(); + + /** + * Convert an Hazelcast Jet DAG to %network display system + * to leverage Zeppelin's built in visualization + * @param dag DAG object to convert + * @return Zeppelin %network + */ + public static String displayNetworkFromDAG(DAG dag){ + GraphResult.Graph graph = new GraphResult.Graph(); + graph.setDirected(true); + + // Map between vertex name (from DAG) and node id (for graph) + Map<String, Integer> nodeIds = new HashMap<>(); + + // Create graph nodes based on dag vertices + List<Node> nodes = new ArrayList<>(); + AtomicInteger nodeCount = new AtomicInteger(1); + dag.forEach(v -> { + // Assign an index to the vertex name + nodeIds.put(v.getName(), nodeCount.getAndIncrement()); + Node node = new Node(); + node.setId(nodeIds.get(v.getName())); + // Define node label from vertex name + if (v.getName().toLowerCase().contains("sink")) + node.setLabel("Sink"); + else if (v.getName().toLowerCase().contains("source")) + node.setLabel("Source"); + else + node.setLabel("Transform"); + // Add node description + Map<String, Object> data = new HashMap<>(); + data.put("description", v.getName()); + node.setData(data); + nodes.add(node); + }); + graph.setNodes(nodes); + + // Set labels colors + Map<String, String> labels = new HashMap<>(); + labels.put("Source", "#00317c"); + labels.put("Transform", "#ff7600"); + labels.put("Sink", "#00317c"); + graph.setLabels(labels); + + // Map between edge name (from DAG) and relationship id (for graph) + Map<String, Integer> edgeIds = new HashMap<>(); + + // Create graph relationships + List<Relationship> rels = new ArrayList<>(); + AtomicInteger relCount = new AtomicInteger(1); + dag.forEach(v -> { + dag.getInboundEdges(v.getName()).forEach(e -> { + String edgeName = e.getSourceName() + " to " + e.getDestName(); + if (edgeIds.get(edgeName) == null) { + // Assign an index to the edge name if not found + edgeIds.put(edgeName, relCount.getAndIncrement()); + Relationship rel = new Relationship(); + rel.setId(edgeIds.get(edgeName)); + rel.setSource(nodeIds.get(e.getSourceName())); + rel.setTarget(nodeIds.get(e.getDestName())); + // Add rel data + Map<String, Object> data = new HashMap<>(); + data.put("routing", e.getRoutingPolicy().toString()); + data.put("priority", e.getPriority()); + data.put("distributed", e.isDistributed()); + rel.setData(data); + rels.add(rel); + } + }); + dag.getOutboundEdges(v.getName()).forEach(e -> { + String edgeName = e.getSourceName() + " to " + e.getDestName(); + if (edgeIds.get(edgeName) == null) { + // Assign an index to the edge name if not found + edgeIds.put(edgeName, relCount.getAndIncrement()); + Relationship rel = new Relationship(); + rel.setId(edgeIds.get(edgeName)); + rel.setSource(nodeIds.get(e.getSourceName())); + rel.setTarget(nodeIds.get(e.getDestName())); + // Add rel data + Map<String, Object> data = new HashMap<>(); + data.put("routing", e.getRoutingPolicy().toString()); + data.put("priority", e.getPriority()); + data.put("distributed", e.isDistributed()); + rel.setData(data); + rels.add(rel); + } + }); + }); + graph.setEdges(rels); + + return "%network " + gson.toJson(graph); + } + +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/794e1897/hazelcastjet/src/main/resources/interpreter-setting.json ---------------------------------------------------------------------- diff --git a/hazelcastjet/src/main/resources/interpreter-setting.json b/hazelcastjet/src/main/resources/interpreter-setting.json new file mode 100644 index 0000000..938bf50 --- /dev/null +++ b/hazelcastjet/src/main/resources/interpreter-setting.json @@ -0,0 +1,14 @@ +[ + { + "group": "hazelcastjet", + "name": "hazelcastjet", + "className": "org.apache.zeppelin.hazelcastjet.HazelcastJetInterpreter", + "defaultInterpreter": true, + "properties": { + }, + "editor": { + "language": "java", + "editOnDblClick": false + } + } +] http://git-wip-us.apache.org/repos/asf/zeppelin/blob/794e1897/hazelcastjet/src/test/java/org/apache/zeppelin/hazelcastjet/HazelcastJetInterpreterTest.java ---------------------------------------------------------------------- diff --git a/hazelcastjet/src/test/java/org/apache/zeppelin/hazelcastjet/HazelcastJetInterpreterTest.java b/hazelcastjet/src/test/java/org/apache/zeppelin/hazelcastjet/HazelcastJetInterpreterTest.java new file mode 100644 index 0000000..b646941 --- /dev/null +++ b/hazelcastjet/src/test/java/org/apache/zeppelin/hazelcastjet/HazelcastJetInterpreterTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.hazelcastjet; + +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; + +/** + * HazelcastJetInterpreterTest + */ +public class HazelcastJetInterpreterTest { + + private static HazelcastJetInterpreter jet; + private static InterpreterContext context; + + @BeforeClass + public static void setUp() { + Properties p = new Properties(); + jet = new HazelcastJetInterpreter(p); + jet.open(); + context = InterpreterContext.builder().build(); + } + + @AfterClass + public static void tearDown() { + jet.close(); + } + + @Test + public void testStaticRepl() { + + StringWriter writer = new StringWriter(); + PrintWriter out = new PrintWriter(writer); + out.println("public class HelloWorld {"); + out.println(" public static void main(String args[]) {"); + out.println(" System.out.println(\"This is in another java file\");"); + out.println(" }"); + out.println("}"); + out.close(); + + InterpreterResult res = jet.interpret(writer.toString(), context); + + assertEquals(InterpreterResult.Code.SUCCESS, res.code()); + assertEquals(InterpreterResult.Type.TEXT, res.message().get(0).getType()); + } + + @Test + public void testStaticReplWithoutMain() { + + StringBuffer sourceCode = new StringBuffer(); + sourceCode.append("package org.mdkt;\n"); + sourceCode.append("public class HelloClass {\n"); + sourceCode.append(" public String hello() { return \"hello\"; }"); + sourceCode.append("}"); + InterpreterResult res = jet.interpret(sourceCode.toString(), context); + assertEquals(InterpreterResult.Code.ERROR, res.code()); + } + + @Test + public void testStaticReplWithSyntaxError() { + + StringWriter writer = new StringWriter(); + PrintWriter out = new PrintWriter(writer); + out.println("public class HelloWorld {"); + out.println(" public static void main(String args[]) {"); + out.println(" System.out.prin(\"This is in another java file\");"); + out.println(" }"); + out.println("}"); + out.close(); + InterpreterResult res = jet.interpret(writer.toString(), context); + + assertEquals(InterpreterResult.Code.ERROR, res.code()); + } + +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/794e1897/hazelcastjet/src/test/java/org/apache/zeppelin/hazelcastjet/HazelcastJetInterpreterUtilsTest.java ---------------------------------------------------------------------- diff --git a/hazelcastjet/src/test/java/org/apache/zeppelin/hazelcastjet/HazelcastJetInterpreterUtilsTest.java b/hazelcastjet/src/test/java/org/apache/zeppelin/hazelcastjet/HazelcastJetInterpreterUtilsTest.java new file mode 100644 index 0000000..600da7a --- /dev/null +++ b/hazelcastjet/src/test/java/org/apache/zeppelin/hazelcastjet/HazelcastJetInterpreterUtilsTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.hazelcastjet; + +import com.hazelcast.jet.pipeline.Pipeline; +import com.hazelcast.jet.pipeline.Sinks; +import com.hazelcast.jet.pipeline.Sources; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Properties; + +import static com.hazelcast.jet.Traversers.traverseArray; +import static com.hazelcast.jet.aggregate.AggregateOperations.counting; +import static com.hazelcast.jet.function.DistributedFunctions.wholeItem; +import static org.junit.Assert.assertEquals; + +public class HazelcastJetInterpreterUtilsTest { + + private static final String NETWORK_RESULT_1 = "%network " + + "{\"nodes\":[" + + "{\"id\":1,\"data\":{\"description\":\"listSource(text)\"},\"label\":\"Source\"}," + + "{\"id\":2,\"data\":{\"description\":\"flat traversing\"},\"label\":\"Transform\"}," + + "{\"id\":3,\"data\":{\"description\":\"filter\"},\"label\":\"Transform\"}," + + "{\"id\":4,\"data\":{\"description\":\"group-and-aggregate-step1\"}," + + "\"label\":\"Transform\"}," + + "{\"id\":5,\"data\":{\"description\":\"group-and-aggregate-step2\"}," + + "\"label\":\"Transform\"}," + + "{\"id\":6,\"data\":{\"description\":\"mapSink(counts)\"},\"label\":\"Sink\"}]," + + "\"edges\":[" + + "{\"source\":1,\"target\":2,\"id\":1,\"data\":{\"routing\":\"UNICAST\"," + + "\"distributed\":false,\"priority\":0}}," + + "{\"source\":2,\"target\":3,\"id\":2,\"data\":{\"routing\":\"UNICAST\"," + + "\"distributed\":false,\"priority\":0}}," + + "{\"source\":3,\"target\":4,\"id\":3,\"data\":{\"routing\":\"PARTITIONED\"," + + "\"distributed\":false,\"priority\":0}}," + + "{\"source\":4,\"target\":5,\"id\":4,\"data\":{\"routing\":\"PARTITIONED\"," + + "\"distributed\":true,\"priority\":0}}," + + "{\"source\":5,\"target\":6,\"id\":5,\"data\":{\"routing\":\"UNICAST\"," + + "\"distributed\":false,\"priority\":0}}]," + + "\"labels\":{\"Sink\":\"#00317c\",\"Transform\":\"#ff7600\",\"Source\":\"#00317c\"}," + + "\"directed\":true}"; + + private static HazelcastJetInterpreter jet; + private static InterpreterContext context; + + @BeforeClass + public static void setUp() { + Properties p = new Properties(); + jet = new HazelcastJetInterpreter(p); + jet.open(); + context = InterpreterContext.builder().build(); + } + + @AfterClass + public static void tearDown() { + jet.close(); + } + + @Test + public void testDisplayNetworkFromDAGUtil() { + + Pipeline p = Pipeline.create(); + p.drawFrom(Sources.<String>list("text")) + .flatMap(word -> + traverseArray(word.toLowerCase().split("\\W+"))).setName("flat traversing") + .filter(word -> !word.isEmpty()) + .groupingKey(wholeItem()) + .aggregate(counting()) + .drainTo(Sinks.map("counts")); + + assertEquals( + NETWORK_RESULT_1, + HazelcastJetInterpreterUtils.displayNetworkFromDAG(p.toDag()) + ); + + } + + @Test + public void testStaticReplWithdisplayNetworkFromDAGUtilReturnNetworkType() { + + StringWriter writer = new StringWriter(); + PrintWriter out = new PrintWriter(writer); + out.println("import com.hazelcast.jet.pipeline.Pipeline;"); + out.println("import com.hazelcast.jet.pipeline.Sinks;"); + out.println("import com.hazelcast.jet.pipeline.Sources;"); + out.println("import org.apache.zeppelin.hazelcastjet.HazelcastJetInterpreterUtils;"); + out.println("import static com.hazelcast.jet.Traversers.traverseArray;"); + out.println("import static com.hazelcast.jet.aggregate.AggregateOperations.counting;"); + out.println("import static com.hazelcast.jet.function.DistributedFunctions.wholeItem;"); + out.println("public class HelloWorld {"); + out.println(" public static void main(String args[]) {"); + out.println(" Pipeline p = Pipeline.create();"); + out.println(" p.drawFrom(Sources.<String>list(\"text\"))"); + out.println(" .flatMap(word ->"); + out.println(" traverseArray(word.toLowerCase().split(\"\\\\W+\")))" + + ".setName(\"flat traversing\")"); + out.println(" .filter(word -> !word.isEmpty())"); + out.println(" .groupingKey(wholeItem())"); + out.println(" .aggregate(counting())"); + out.println(" .drainTo(Sinks.map(\"counts\"));"); + out.println(" System.out.println(HazelcastJetInterpreterUtils" + + ".displayNetworkFromDAG(p.toDag()));"); + out.println(" }"); + out.println("}"); + out.close(); + + InterpreterResult res = jet.interpret(writer.toString(), context); + + assertEquals(InterpreterResult.Code.SUCCESS, res.code()); + assertEquals(InterpreterResult.Type.NETWORK, res.message().get(0).getType()); + + } + +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/794e1897/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index bdc3334..e416d55 100644 --- a/pom.xml +++ b/pom.xml @@ -82,6 +82,7 @@ <module>scalding</module> <module>java</module> <module>beam</module> + <module>hazelcastjet</module> <module>geode</module> <module>zeppelin-web</module> <module>zeppelin-server</module> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/794e1897/zeppelin-distribution/src/bin_license/LICENSE ---------------------------------------------------------------------- diff --git a/zeppelin-distribution/src/bin_license/LICENSE b/zeppelin-distribution/src/bin_license/LICENSE index caa3ba4..d9b0797 100644 --- a/zeppelin-distribution/src/bin_license/LICENSE +++ b/zeppelin-distribution/src/bin_license/LICENSE @@ -218,6 +218,7 @@ The following components are provided under Apache License. (Apache 2.0) frontend-plugin-core 1.3 (com.github.eirslett:frontend-plugin-core) - https://github.com/eirslett/frontend-maven-plugin/blob/frontend-plugins-1.3/LICENSE (Apache 2.0) mongo-java-driver 3.4.1 (org.mongodb:mongo-java-driver:3.4.1) - https://github.com/mongodb/mongo-java-driver/blob/master/LICENSE.txt (Apache 2.0) Neo4j Java Driver (https://github.com/neo4j/neo4j-java-driver) - https://github.com/neo4j/neo4j-java-driver/blob/1.4.3/LICENSE.txt + (Apache 2.0) Hazelcast Jet (http://jet.hazelcast.org) - https://github.com/hazelcast/hazelcast-jet/blob/master/LICENSE ======================================================================== MIT licenses