Repository: incubator-zeppelin
Updated Branches:
refs/heads/master b45663d22 -> 554e78c88
[ZEPPELIN-689] Add AngularJS z object and z.angularBind()
### What is this PR for?
Add client-side `z` object with method `angularBind()`
Leemoonsoo
Compared to the original implementation, I have simplified a lot.
Now, you can only bind angular variable to one unique scope, which is the
**paragraph**. I just remove the note scope and also remove the `interpreter`
parameter.
Indeed, when passing a `paragraphId`, on the server-side, we can retrieve the
`Paragraph` object with the `noteId` + `paragraphId` so we can now which
interpreter is currently being used.
The signature of the `angularBind(varName, value, paragraphId)` method has also
been greatly simplified.
_This is a sub-task of epic **[ZEPPELIN-635]**_
### What type of PR is it?
[Improvement]
### Todos
* [ ] - Code review
* [ ] - Simple test
### Is there a relevant Jira issue?
**[ZEPPELIN-689]**
### How should this be tested?
* `git fetch origin pull/740/head:AngularJSBind`
* `git checkout AngularJSBind`
* `mvn clean package -DskipTests`
* `bin/zeppelin-daemon.sh restart`
* Create a new note
* In the first paragraph, put the following code
```html
%angular
<form class="form-inline">
<div class="form-group">
<label for="superheroId">Super Hero: </label>
<input type="text" class="form-control" id="superheroId"
placeholder="Superhero name ..." ng-model="superhero"></input>
</div>
<button type="submit" class="btn btn-primary"
ng-click="z.angularBind('superhero', superhero, 'PUT_HERE_PARAGRAPH_ID')">
Bind Angular</button>
</form>
```
* Create a second paragraph with the following code:
```scala
z.angular("superhero")
````
* Retrieve the paragraphId of the second paragraph
* In the first paragraph, replace the text **PUT_HERE_PARAGRAPH_ID** by the
correct paragraph id
* In the input text, put "Superman" and click on the **Bind Angular** button
* Execute the second paragraph to see that the _superhero_ variable is now set
to **Superman**
### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? --> **No**
* Is there breaking changes for older versions? --> **No**
* Does this needs documentation? --> **Yes**
[ZEPPELIN-635]: https://issues.apache.org/jira/browse/ZEPPELIN-635
[ZEPPELIN-689]: https://issues.apache.org/jira/browse/ZEPPELIN-689
Author: DuyHai DOAN <[email protected]>
Closes #740 from doanduyhai/ZEPPELIN-689 and squashes the following commits:
4aa83f5 [DuyHai DOAN] [ZEPPELIN-689] Implement z.angularBind() function
aabc1bc [DuyHai DOAN] [ZEPPELIN-689] ZeppelinContext angular() method should
look for variable using the paragraph scope then note scope
954b8f6 [DuyHai DOAN] [ZEPPELIN-689] Make AngularObject constructor public
because of serialization issue
3cac8d2 [DuyHai DOAN] [ZEPPELIN-689] Add Thrift RPC method angularRegistryPush()
Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit:
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/554e78c8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/554e78c8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/554e78c8
Branch: refs/heads/master
Commit: 554e78c8886a2c14d737314e33bc376dda0785c8
Parents: b45663d
Author: DuyHai DOAN <[email protected]>
Authored: Tue Feb 23 11:01:31 2016 +0100
Committer: Lee moon soo <[email protected]>
Committed: Fri Mar 18 09:23:15 2016 -0700
----------------------------------------------------------------------
pom.xml | 7 +
.../apache/zeppelin/spark/ZeppelinContext.java | 6 +-
.../apache/zeppelin/display/AngularObject.java | 42 +-
.../zeppelin/display/AngularObjectRegistry.java | 8 +
.../zeppelin/interpreter/InterpreterGroup.java | 9 +
.../remote/RemoteAngularObjectRegistry.java | 4 +-
.../interpreter/remote/RemoteInterpreter.java | 42 +-
.../remote/RemoteInterpreterServer.java | 12 +
.../thrift/RemoteInterpreterContext.java | 19 +-
.../thrift/RemoteInterpreterEvent.java | 19 +-
.../thrift/RemoteInterpreterEventType.java | 22 +-
.../thrift/RemoteInterpreterResult.java | 19 +-
.../thrift/RemoteInterpreterService.java | 756 ++++++++++++++++++-
.../main/thrift/RemoteInterpreterService.thrift | 4 +-
.../remote/RemoteInterpreterTest.java | 32 +
zeppelin-server/pom.xml | 5 +
.../org/apache/zeppelin/socket/Message.java | 15 +
.../apache/zeppelin/socket/NotebookServer.java | 113 ++-
.../zeppelin/display/AngularObjectBuilder.java | 26 +
.../zeppelin/socket/NotebookServerTest.java | 105 +++
.../notebook/paragraph/paragraph.controller.js | 13 +-
.../websocketEvents/websocketMsg.service.js | 12 +
.../org/apache/zeppelin/notebook/Paragraph.java | 13 +
23 files changed, 1186 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/554e78c8/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f566cca..5a14040 100755
--- a/pom.xml
+++ b/pom.xml
@@ -194,6 +194,13 @@
<version>2.4</version>
</dependency>
+
+ <dependency>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ <version>3.2.1</version>
+ </dependency>
+
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/554e78c8/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
index 88094b5..692f240 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
@@ -371,7 +371,11 @@ public class ZeppelinContext {
AngularObjectRegistry registry =
interpreterContext.getAngularObjectRegistry();
String noteId = interpreterContext.getNoteId();
// try get local object
- AngularObject ao = registry.get(name, interpreterContext.getNoteId(),
null);
+ AngularObject paragraphAo = registry.get(name, noteId,
interpreterContext.getParagraphId());
+ AngularObject noteAo = registry.get(name, noteId, null);
+
+ AngularObject ao = paragraphAo != null ? paragraphAo : noteAo;
+
if (ao == null) {
// then global object
ao = registry.get(name, null, null);
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/554e78c8/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java
----------------------------------------------------------------------
diff --git
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java
index 4b0c3e9..3ef1993 100644
---
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java
+++
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java
@@ -19,6 +19,7 @@ package org.apache.zeppelin.display;
import java.util.LinkedList;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.ExecutorService;
import org.apache.zeppelin.scheduler.ExecutorFactory;
@@ -44,6 +45,15 @@ public class AngularObject<T> {
private String paragraphId; // paragraphId belongs to. null for notebook
scope
/**
+ * Public constructor, neccessary for the deserialization when using Thrift
angularRegistryPush()
+ * Without public constructor, GSON library will instantiate the
AngularObject using
+ * serialization so the <strong>watchers</strong> list won't be initialized
and will throw
+ * NullPointerException the first time it is accessed
+ */
+ public AngularObject() {
+ }
+
+ /**
* To create new AngularObject, use AngularObjectRegistry.add()
*
* @param name name of object
@@ -111,17 +121,17 @@ public class AngularObject<T> {
@Override
public boolean equals(Object o) {
- if (o instanceof AngularObject) {
- AngularObject ao = (AngularObject) o;
- if (noteId == null && ao.noteId == null ||
- (noteId != null && ao.noteId != null && noteId.equals(ao.noteId))) {
- if (paragraphId == null && ao.paragraphId == null ||
- (paragraphId != null && ao.paragraphId != null &&
paragraphId.equals(ao.paragraphId))) {
- return name.equals(ao.name);
- }
- }
- }
- return false;
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ AngularObject<?> that = (AngularObject<?>) o;
+ return Objects.equals(name, that.name) &&
+ Objects.equals(noteId, that.noteId) &&
+ Objects.equals(paragraphId, that.paragraphId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, noteId, paragraphId);
}
/**
@@ -232,4 +242,14 @@ public class AngularObject<T> {
}
}
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("AngularObject{");
+ sb.append("noteId='").append(noteId).append('\'');
+ sb.append(", paragraphId='").append(paragraphId).append('\'');
+ sb.append(", object=").append(object);
+ sb.append(", name='").append(name).append('\'');
+ sb.append('}');
+ return sb.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/554e78c8/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistry.java
----------------------------------------------------------------------
diff --git
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistry.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistry.java
index cf360af..af05d78 100644
---
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistry.java
+++
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistry.java
@@ -246,4 +246,12 @@ public class AngularObjectRegistry {
public String getInterpreterGroupId() {
return interpreterId;
}
+
+ public Map<String, Map<String, AngularObject>> getRegistry() {
+ return registry;
+ }
+
+ public void setRegistry(Map<String, Map<String, AngularObject>> registry) {
+ this.registry = registry;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/554e78c8/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
----------------------------------------------------------------------
diff --git
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
index 3ed988a..b5d5863 100644
---
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
+++
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
@@ -48,6 +48,7 @@ public class InterpreterGroup extends
ConcurrentHashMap<String, List<Interpreter
AngularObjectRegistry angularObjectRegistry;
RemoteInterpreterProcess remoteInterpreterProcess; // attached remote
interpreter process
ResourcePool resourcePool;
+ boolean angularRegistryPushed = false;
// map [notebook session, Interpreters in the group], to support per note
session interpreters
//Map<String, List<Interpreter>> interpreters = new ConcurrentHashMap<String,
@@ -254,4 +255,12 @@ public class InterpreterGroup extends
ConcurrentHashMap<String, List<Interpreter
public ResourcePool getResourcePool() {
return resourcePool;
}
+
+ public boolean isAngularRegistryPushed() {
+ return angularRegistryPushed;
+ }
+
+ public void setAngularRegistryPushed(boolean angularRegistryPushed) {
+ this.angularRegistryPushed = angularRegistryPushed;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/554e78c8/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
----------------------------------------------------------------------
diff --git
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
index 3789292..b80a252 100644
---
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
+++
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
@@ -63,7 +63,7 @@ public class RemoteAngularObjectRegistry extends
AngularObjectRegistry {
Gson gson = new Gson();
RemoteInterpreterProcess remoteInterpreterProcess =
getRemoteInterpreterProcess();
if (!remoteInterpreterProcess.isRunning()) {
- return null;
+ return super.add(name, o, noteId, paragraphId, true);
}
Client client = null;
@@ -97,7 +97,7 @@ public class RemoteAngularObjectRegistry extends
AngularObjectRegistry {
paragraphId) {
RemoteInterpreterProcess remoteInterpreterProcess =
getRemoteInterpreterProcess();
if (!remoteInterpreterProcess.isRunning()) {
- return null;
+ return super.remove(name, noteId, paragraphId);
}
Client client = null;
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/554e78c8/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index 535d3df..137b605 100644
---
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -20,6 +20,8 @@ package org.apache.zeppelin.interpreter.remote;
import java.util.*;
import org.apache.thrift.TException;
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
@@ -128,10 +130,11 @@ public class RemoteInterpreter extends Interpreter {
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
- interpreterProcess.reference(getInterpreterGroup());
+ final InterpreterGroup interpreterGroup = getInterpreterGroup();
+ interpreterProcess.reference(interpreterGroup);
interpreterProcess.setMaxPoolSize(
Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize()));
- String groupId = getInterpreterGroup().getId();
+ String groupId = interpreterGroup.getId();
synchronized (interpreterProcess) {
Client client = null;
@@ -146,7 +149,14 @@ public class RemoteInterpreter extends Interpreter {
logger.info("Create remote interpreter {}", getClassName());
property.put("zeppelin.interpreter.localRepo", localRepoPath);
client.createInterpreter(groupId, noteId,
- getClassName(), (Map) property);
+ getClassName(), (Map) property);
+
+ // Push angular object loaded from JSON file to remote interpreter
+ if (!interpreterGroup.isAngularRegistryPushed()) {
+ pushAngularObjectRegistryToRemote(client);
+ interpreterGroup.setAngularRegistryPushed(true);
+ }
+
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
@@ -387,4 +397,30 @@ public class RemoteInterpreter extends Interpreter {
Type.valueOf(result.getType()),
result.getMsg());
}
+
+ /**
+ * Push local angular object registry to
+ * remote interpreter. This method should be
+ * call ONLY inside the init() method
+ * @param client
+ * @throws TException
+ */
+ void pushAngularObjectRegistryToRemote(Client client) throws TException {
+ final AngularObjectRegistry angularObjectRegistry =
this.getInterpreterGroup()
+ .getAngularObjectRegistry();
+
+ if (angularObjectRegistry != null && angularObjectRegistry.getRegistry()
!= null) {
+ final Map<String, Map<String, AngularObject>> registry =
angularObjectRegistry
+ .getRegistry();
+
+ logger.info("Push local angular object registry from ZeppelinServer to" +
+ " remote interpreter group {}",
this.getInterpreterGroup().getId());
+
+ final java.lang.reflect.Type registryType = new TypeToken<Map<String,
+ Map<String, AngularObject>>>() {}.getType();
+
+ Gson gson = new Gson();
+ client.angularRegistryPush(gson.toJson(registry, registryType));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/554e78c8/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 84477ea..6e369c0 100644
---
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -684,4 +684,16 @@ public class RemoteInterpreterServer
}
}
}
+
+ @Override
+ public void angularRegistryPush(String registryAsString) throws TException {
+ try {
+ Map<String, Map<String, AngularObject>> deserializedRegistry = gson
+ .fromJson(registryAsString,
+ new TypeToken<Map<String, Map<String, AngularObject>>>()
{ }.getType());
+
interpreterGroup.getAngularObjectRegistry().setRegistry(deserializedRegistry);
+ } catch (Exception e) {
+ logger.info("Exception in RemoteInterpreterServer while
angularRegistryPush, nolock", e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/554e78c8/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
----------------------------------------------------------------------
diff --git
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
index d3087c7..889e45d 100644
---
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
+++
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
@@ -1,21 +1,4 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
* Autogenerated by Thrift Compiler (0.9.2)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
@@ -51,7 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date =
"2016-3-4")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date =
"2016-3-17")
public class RemoteInterpreterContext implements
org.apache.thrift.TBase<RemoteInterpreterContext,
RemoteInterpreterContext._Fields>, java.io.Serializable, Cloneable,
Comparable<RemoteInterpreterContext> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new
org.apache.thrift.protocol.TStruct("RemoteInterpreterContext");
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/554e78c8/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
----------------------------------------------------------------------
diff --git
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
index c7d3ffc..c89a287 100644
---
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
+++
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
@@ -1,21 +1,4 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
* Autogenerated by Thrift Compiler (0.9.2)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
@@ -51,7 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date =
"2016-3-4")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date =
"2016-3-17")
public class RemoteInterpreterEvent implements
org.apache.thrift.TBase<RemoteInterpreterEvent,
RemoteInterpreterEvent._Fields>, java.io.Serializable, Cloneable,
Comparable<RemoteInterpreterEvent> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new
org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent");
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/554e78c8/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
----------------------------------------------------------------------
diff --git
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
index 7cb7963..8db330a 100644
---
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
+++
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
@@ -1,21 +1,4 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
* Autogenerated by Thrift Compiler (0.9.2)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
@@ -37,7 +20,8 @@ public enum RemoteInterpreterEventType implements
org.apache.thrift.TEnum {
RESOURCE_POOL_GET_ALL(6),
RESOURCE_GET(7),
OUTPUT_APPEND(8),
- OUTPUT_UPDATE(9);
+ OUTPUT_UPDATE(9),
+ ANGULAR_REGISTRY_PUSH(10);
private final int value;
@@ -76,6 +60,8 @@ public enum RemoteInterpreterEventType implements
org.apache.thrift.TEnum {
return OUTPUT_APPEND;
case 9:
return OUTPUT_UPDATE;
+ case 10:
+ return ANGULAR_REGISTRY_PUSH;
default:
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/554e78c8/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
----------------------------------------------------------------------
diff --git
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
index a51d6d7..7ed20f6 100644
---
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
+++
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
@@ -1,21 +1,4 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
* Autogenerated by Thrift Compiler (0.9.2)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
@@ -51,7 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date =
"2016-3-4")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date =
"2016-3-17")
public class RemoteInterpreterResult implements
org.apache.thrift.TBase<RemoteInterpreterResult,
RemoteInterpreterResult._Fields>, java.io.Serializable, Cloneable,
Comparable<RemoteInterpreterResult> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new
org.apache.thrift.protocol.TStruct("RemoteInterpreterResult");
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/554e78c8/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
----------------------------------------------------------------------
diff --git
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
index 47a86a1..3f26b79 100644
---
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
+++
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
@@ -1,21 +1,4 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
* Autogenerated by Thrift Compiler (0.9.2)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
@@ -51,7 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date =
"2016-3-4")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date =
"2016-3-17")
public class RemoteInterpreterService {
public interface Iface {
@@ -94,6 +77,8 @@ public class RemoteInterpreterService {
public void angularObjectRemove(String name, String noteId, String
paragraphId) throws org.apache.thrift.TException;
+ public void angularRegistryPush(String registry) throws
org.apache.thrift.TException;
+
}
public interface AsyncIface {
@@ -136,6 +121,8 @@ public class RemoteInterpreterService {
public void angularObjectRemove(String name, String noteId, String
paragraphId, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws
org.apache.thrift.TException;
+ public void angularRegistryPush(String registry,
org.apache.thrift.async.AsyncMethodCallback resultHandler) throws
org.apache.thrift.TException;
+
}
public static class Client extends org.apache.thrift.TServiceClient
implements Iface {
@@ -592,6 +579,26 @@ public class RemoteInterpreterService {
return;
}
+ public void angularRegistryPush(String registry) throws
org.apache.thrift.TException
+ {
+ send_angularRegistryPush(registry);
+ recv_angularRegistryPush();
+ }
+
+ public void send_angularRegistryPush(String registry) throws
org.apache.thrift.TException
+ {
+ angularRegistryPush_args args = new angularRegistryPush_args();
+ args.setRegistry(registry);
+ sendBase("angularRegistryPush", args);
+ }
+
+ public void recv_angularRegistryPush() throws org.apache.thrift.TException
+ {
+ angularRegistryPush_result result = new angularRegistryPush_result();
+ receiveBase(result, "angularRegistryPush");
+ return;
+ }
+
}
public static class AsyncClient extends org.apache.thrift.async.TAsyncClient
implements AsyncIface {
public static class Factory implements
org.apache.thrift.async.TAsyncClientFactory<AsyncClient> {
@@ -1299,6 +1306,38 @@ public class RemoteInterpreterService {
}
}
+ public void angularRegistryPush(String registry,
org.apache.thrift.async.AsyncMethodCallback resultHandler) throws
org.apache.thrift.TException {
+ checkReady();
+ angularRegistryPush_call method_call = new
angularRegistryPush_call(registry, resultHandler, this, ___protocolFactory,
___transport);
+ this.___currentMethod = method_call;
+ ___manager.call(method_call);
+ }
+
+ public static class angularRegistryPush_call extends
org.apache.thrift.async.TAsyncMethodCall {
+ private String registry;
+ public angularRegistryPush_call(String registry,
org.apache.thrift.async.AsyncMethodCallback resultHandler,
org.apache.thrift.async.TAsyncClient client,
org.apache.thrift.protocol.TProtocolFactory protocolFactory,
org.apache.thrift.transport.TNonblockingTransport transport) throws
org.apache.thrift.TException {
+ super(client, protocolFactory, transport, resultHandler, false);
+ this.registry = registry;
+ }
+
+ public void write_args(org.apache.thrift.protocol.TProtocol prot) throws
org.apache.thrift.TException {
+ prot.writeMessageBegin(new
org.apache.thrift.protocol.TMessage("angularRegistryPush",
org.apache.thrift.protocol.TMessageType.CALL, 0));
+ angularRegistryPush_args args = new angularRegistryPush_args();
+ args.setRegistry(registry);
+ args.write(prot);
+ prot.writeMessageEnd();
+ }
+
+ public void getResult() throws org.apache.thrift.TException {
+ if (getState() !=
org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
+ throw new IllegalStateException("Method call not finished!");
+ }
+ org.apache.thrift.transport.TMemoryInputTransport memoryTransport =
new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
+ org.apache.thrift.protocol.TProtocol prot =
client.getProtocolFactory().getProtocol(memoryTransport);
+ (new Client(prot)).recv_angularRegistryPush();
+ }
+ }
+
}
public static class Processor<I extends Iface> extends
org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
@@ -1331,6 +1370,7 @@ public class RemoteInterpreterService {
processMap.put("angularObjectUpdate", new angularObjectUpdate());
processMap.put("angularObjectAdd", new angularObjectAdd());
processMap.put("angularObjectRemove", new angularObjectRemove());
+ processMap.put("angularRegistryPush", new angularRegistryPush());
return processMap;
}
@@ -1716,6 +1756,26 @@ public class RemoteInterpreterService {
}
}
+ public static class angularRegistryPush<I extends Iface> extends
org.apache.thrift.ProcessFunction<I, angularRegistryPush_args> {
+ public angularRegistryPush() {
+ super("angularRegistryPush");
+ }
+
+ public angularRegistryPush_args getEmptyArgsInstance() {
+ return new angularRegistryPush_args();
+ }
+
+ protected boolean isOneway() {
+ return false;
+ }
+
+ public angularRegistryPush_result getResult(I iface,
angularRegistryPush_args args) throws org.apache.thrift.TException {
+ angularRegistryPush_result result = new angularRegistryPush_result();
+ iface.angularRegistryPush(args.registry);
+ return result;
+ }
+ }
+
}
public static class AsyncProcessor<I extends AsyncIface> extends
org.apache.thrift.TBaseAsyncProcessor<I> {
@@ -1748,6 +1808,7 @@ public class RemoteInterpreterService {
processMap.put("angularObjectUpdate", new angularObjectUpdate());
processMap.put("angularObjectAdd", new angularObjectAdd());
processMap.put("angularObjectRemove", new angularObjectRemove());
+ processMap.put("angularRegistryPush", new angularRegistryPush());
return processMap;
}
@@ -2712,6 +2773,56 @@ public class RemoteInterpreterService {
}
}
+ public static class angularRegistryPush<I extends AsyncIface> extends
org.apache.thrift.AsyncProcessFunction<I, angularRegistryPush_args, Void> {
+ public angularRegistryPush() {
+ super("angularRegistryPush");
+ }
+
+ public angularRegistryPush_args getEmptyArgsInstance() {
+ return new angularRegistryPush_args();
+ }
+
+ public AsyncMethodCallback<Void> getResultHandler(final AsyncFrameBuffer
fb, final int seqid) {
+ final org.apache.thrift.AsyncProcessFunction fcall = this;
+ return new AsyncMethodCallback<Void>() {
+ public void onComplete(Void o) {
+ angularRegistryPush_result result = new
angularRegistryPush_result();
+ try {
+ fcall.sendResponse(fb,result,
org.apache.thrift.protocol.TMessageType.REPLY,seqid);
+ return;
+ } catch (Exception e) {
+ LOGGER.error("Exception writing to internal frame buffer", e);
+ }
+ fb.close();
+ }
+ public void onError(Exception e) {
+ byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
+ org.apache.thrift.TBase msg;
+ angularRegistryPush_result result = new
angularRegistryPush_result();
+ {
+ msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
+ msg = (org.apache.thrift.TBase)new
org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR,
e.getMessage());
+ }
+ try {
+ fcall.sendResponse(fb,msg,msgType,seqid);
+ return;
+ } catch (Exception ex) {
+ LOGGER.error("Exception writing to internal frame buffer", ex);
+ }
+ fb.close();
+ }
+ };
+ }
+
+ protected boolean isOneway() {
+ return false;
+ }
+
+ public void start(I iface, angularRegistryPush_args args,
org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws
TException {
+ iface.angularRegistryPush(args.registry,resultHandler);
+ }
+ }
+
}
public static class createInterpreter_args implements
org.apache.thrift.TBase<createInterpreter_args,
createInterpreter_args._Fields>, java.io.Serializable, Cloneable,
Comparable<createInterpreter_args> {
@@ -18355,4 +18466,613 @@ public class RemoteInterpreterService {
}
+ public static class angularRegistryPush_args implements
org.apache.thrift.TBase<angularRegistryPush_args,
angularRegistryPush_args._Fields>, java.io.Serializable, Cloneable,
Comparable<angularRegistryPush_args> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new
org.apache.thrift.protocol.TStruct("angularRegistryPush_args");
+
+ private static final org.apache.thrift.protocol.TField REGISTRY_FIELD_DESC
= new org.apache.thrift.protocol.TField("registry",
org.apache.thrift.protocol.TType.STRING, (short)1);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes
= new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new
angularRegistryPush_argsStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new
angularRegistryPush_argsTupleSchemeFactory());
+ }
+
+ public String registry; // required
+
+ /** The set of fields this struct contains, along with convenience methods
for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ REGISTRY((short)1, "registry");
+
+ private static final Map<String, _Fields> byName = new HashMap<String,
_Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not
found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // REGISTRY
+ return REGISTRY;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " +
fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final Map<_Fields,
org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new
EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.REGISTRY, new
org.apache.thrift.meta_data.FieldMetaData("registry",
org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(angularRegistryPush_args.class,
metaDataMap);
+ }
+
+ public angularRegistryPush_args() {
+ }
+
+ public angularRegistryPush_args(
+ String registry)
+ {
+ this();
+ this.registry = registry;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public angularRegistryPush_args(angularRegistryPush_args other) {
+ if (other.isSetRegistry()) {
+ this.registry = other.registry;
+ }
+ }
+
+ public angularRegistryPush_args deepCopy() {
+ return new angularRegistryPush_args(this);
+ }
+
+ @Override
+ public void clear() {
+ this.registry = null;
+ }
+
+ public String getRegistry() {
+ return this.registry;
+ }
+
+ public angularRegistryPush_args setRegistry(String registry) {
+ this.registry = registry;
+ return this;
+ }
+
+ public void unsetRegistry() {
+ this.registry = null;
+ }
+
+ /** Returns true if field registry is set (has been assigned a value) and
false otherwise */
+ public boolean isSetRegistry() {
+ return this.registry != null;
+ }
+
+ public void setRegistryIsSet(boolean value) {
+ if (!value) {
+ this.registry = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case REGISTRY:
+ if (value == null) {
+ unsetRegistry();
+ } else {
+ setRegistry((String)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case REGISTRY:
+ return getRegistry();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been
assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case REGISTRY:
+ return isSetRegistry();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof angularRegistryPush_args)
+ return this.equals((angularRegistryPush_args)that);
+ return false;
+ }
+
+ public boolean equals(angularRegistryPush_args that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_registry = true && this.isSetRegistry();
+ boolean that_present_registry = true && that.isSetRegistry();
+ if (this_present_registry || that_present_registry) {
+ if (!(this_present_registry && that_present_registry))
+ return false;
+ if (!this.registry.equals(that.registry))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ List<Object> list = new ArrayList<Object>();
+
+ boolean present_registry = true && (isSetRegistry());
+ list.add(present_registry);
+ if (present_registry)
+ list.add(registry);
+
+ return list.hashCode();
+ }
+
+ @Override
+ public int compareTo(angularRegistryPush_args other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison =
Boolean.valueOf(isSetRegistry()).compareTo(other.isSetRegistry());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetRegistry()) {
+ lastComparison =
org.apache.thrift.TBaseHelper.compareTo(this.registry, other.registry);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws
org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws
org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("angularRegistryPush_args(");
+ boolean first = true;
+
+ sb.append("registry:");
+ if (this.registry == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.registry);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws
java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new
org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws
java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new
org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class angularRegistryPush_argsStandardSchemeFactory
implements SchemeFactory {
+ public angularRegistryPush_argsStandardScheme getScheme() {
+ return new angularRegistryPush_argsStandardScheme();
+ }
+ }
+
+ private static class angularRegistryPush_argsStandardScheme extends
StandardScheme<angularRegistryPush_args> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot,
angularRegistryPush_args struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // REGISTRY
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING)
{
+ struct.registry = iprot.readString();
+ struct.setRegistryIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot,
schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot,
schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked
in the validate method
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot,
angularRegistryPush_args struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.registry != null) {
+ oprot.writeFieldBegin(REGISTRY_FIELD_DESC);
+ oprot.writeString(struct.registry);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class angularRegistryPush_argsTupleSchemeFactory implements
SchemeFactory {
+ public angularRegistryPush_argsTupleScheme getScheme() {
+ return new angularRegistryPush_argsTupleScheme();
+ }
+ }
+
+ private static class angularRegistryPush_argsTupleScheme extends
TupleScheme<angularRegistryPush_args> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot,
angularRegistryPush_args struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ BitSet optionals = new BitSet();
+ if (struct.isSetRegistry()) {
+ optionals.set(0);
+ }
+ oprot.writeBitSet(optionals, 1);
+ if (struct.isSetRegistry()) {
+ oprot.writeString(struct.registry);
+ }
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot,
angularRegistryPush_args struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ BitSet incoming = iprot.readBitSet(1);
+ if (incoming.get(0)) {
+ struct.registry = iprot.readString();
+ struct.setRegistryIsSet(true);
+ }
+ }
+ }
+
+ }
+
+ public static class angularRegistryPush_result implements
org.apache.thrift.TBase<angularRegistryPush_result,
angularRegistryPush_result._Fields>, java.io.Serializable, Cloneable,
Comparable<angularRegistryPush_result> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new
org.apache.thrift.protocol.TStruct("angularRegistryPush_result");
+
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes
= new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new
angularRegistryPush_resultStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new
angularRegistryPush_resultTupleSchemeFactory());
+ }
+
+
+ /** The set of fields this struct contains, along with convenience methods
for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+;
+
+ private static final Map<String, _Fields> byName = new HashMap<String,
_Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not
found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " +
fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+ public static final Map<_Fields,
org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new
EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(angularRegistryPush_result.class,
metaDataMap);
+ }
+
+ public angularRegistryPush_result() {
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public angularRegistryPush_result(angularRegistryPush_result other) {
+ }
+
+ public angularRegistryPush_result deepCopy() {
+ return new angularRegistryPush_result(this);
+ }
+
+ @Override
+ public void clear() {
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been
assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof angularRegistryPush_result)
+ return this.equals((angularRegistryPush_result)that);
+ return false;
+ }
+
+ public boolean equals(angularRegistryPush_result that) {
+ if (that == null)
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ List<Object> list = new ArrayList<Object>();
+
+ return list.hashCode();
+ }
+
+ @Override
+ public int compareTo(angularRegistryPush_result other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws
org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws
org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("angularRegistryPush_result(");
+ boolean first = true;
+
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws
java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new
org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws
java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new
org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class angularRegistryPush_resultStandardSchemeFactory
implements SchemeFactory {
+ public angularRegistryPush_resultStandardScheme getScheme() {
+ return new angularRegistryPush_resultStandardScheme();
+ }
+ }
+
+ private static class angularRegistryPush_resultStandardScheme extends
StandardScheme<angularRegistryPush_result> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot,
angularRegistryPush_result struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot,
schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked
in the validate method
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot,
angularRegistryPush_result struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class angularRegistryPush_resultTupleSchemeFactory
implements SchemeFactory {
+ public angularRegistryPush_resultTupleScheme getScheme() {
+ return new angularRegistryPush_resultTupleScheme();
+ }
+ }
+
+ private static class angularRegistryPush_resultTupleScheme extends
TupleScheme<angularRegistryPush_result> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot,
angularRegistryPush_result struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot,
angularRegistryPush_result struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ }
+ }
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/554e78c8/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
----------------------------------------------------------------------
diff --git
a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
index 74906f8..80212e7 100644
--- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
+++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
@@ -47,7 +47,8 @@ enum RemoteInterpreterEventType {
RESOURCE_POOL_GET_ALL = 6,
RESOURCE_GET = 7
OUTPUT_APPEND = 8,
- OUTPUT_UPDATE = 9
+ OUTPUT_UPDATE = 9,
+ ANGULAR_REGISTRY_PUSH=10
}
struct RemoteInterpreterEvent {
@@ -86,4 +87,5 @@ service RemoteInterpreterService {
object);
void angularObjectAdd(1: string name, 2: string noteId, 3: string
paragraphId, 4: string object);
void angularObjectRemove(1: string name, 2: string noteId, 3: string
paragraphId);
+ void angularRegistryPush(1: string registry);
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/554e78c8/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
----------------------------------------------------------------------
diff --git
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
index 182b7a2..f3b936b 100644
---
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
+++
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
@@ -28,7 +28,10 @@ import java.util.Map;
import java.util.Properties;
import org.apache.thrift.transport.TTransportException;
+import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.*;
@@ -42,6 +45,10 @@ import org.apache.zeppelin.scheduler.Scheduler;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
public class RemoteInterpreterTest {
@@ -664,4 +671,29 @@ public class RemoteInterpreterTest {
assertEquals(intpAsessionB.getScheduler(), intpBsessionB.getScheduler());
assertNotEquals(intpAsessionA.getScheduler(),
intpAsessionB.getScheduler());
}
+
+ @Test
+ public void should_push_local_angular_repo_to_remote() throws Exception {
+ //Given
+ final Client client = Mockito.mock(Client.class);
+ final RemoteInterpreter intr = new RemoteInterpreter(new Properties(),
"noteId",
+ MockInterpreterA.class.getName(), "runner", "path","localRepo",
env, 10 * 1000, null);
+ final AngularObjectRegistry registry = new AngularObjectRegistry("spark",
null);
+ registry.add("name", "DuyHai DOAN", "nodeId", "paragraphId");
+ final InterpreterGroup interpreterGroup = new InterpreterGroup("groupId");
+ interpreterGroup.setAngularObjectRegistry(registry);
+ intr.setInterpreterGroup(interpreterGroup);
+
+ final java.lang.reflect.Type registryType = new TypeToken<Map<String,
+ Map<String, AngularObject>>>() {}.getType();
+ final Gson gson = new Gson();
+ final String expected = gson.toJson(registry.getRegistry(), registryType);
+
+ //When
+ intr.pushAngularObjectRegistryToRemote(client);
+
+ //Then
+ Mockito.verify(client).angularRegistryPush(expected);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/554e78c8/zeppelin-server/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-server/pom.xml b/zeppelin-server/pom.xml
index 73e878a..ee03c33 100644
--- a/zeppelin-server/pom.xml
+++ b/zeppelin-server/pom.xml
@@ -192,6 +192,11 @@
</dependency>
<dependency>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.2.1</version>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/554e78c8/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java
----------------------------------------------------------------------
diff --git
a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java
b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java
index 7da23ec..f091364 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java
@@ -103,6 +103,8 @@ public class Message {
ANGULAR_OBJECT_UPDATED, // [c-s] angular object value updated,
+ ANGULAR_OBJECT_CLIENT_BIND, // [c-s] angular object updated from
AngularJS z object
+
LIST_CONFIGURATIONS, // [c-s] ask all key/value pairs of configurations
CONFIGURATIONS_INFO, // [s-c] all key/value pairs of configurations
// @param settings serialized Map<String, String> object
@@ -131,4 +133,17 @@ public class Message {
public Object get(String k) {
return data.get(k);
}
+
+ public <T> T getType(String key) {
+ return (T) data.get(key);
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("Message{");
+ sb.append("data=").append(data);
+ sb.append(", op=").append(op);
+ sb.append('}');
+ return sb.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/554e78c8/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git
a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index cb1e646..98a1aaa 100644
---
a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++
b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -16,6 +16,15 @@
*/
package org.apache.zeppelin.socket;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import javax.servlet.http.HttpServletRequest;
+
import com.google.common.base.Strings;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
@@ -25,6 +34,8 @@ import
org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterResult;
@@ -43,13 +54,6 @@ import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.servlet.http.HttpServletRequest;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.util.*;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
/**
* Zeppelin websocket service.
*
@@ -99,6 +103,11 @@ public class NotebookServer extends WebSocketServlet
implements
LOG.debug("RECEIVE PRINCIPAL << " + messagereceived.principal);
LOG.debug("RECEIVE TICKET << " + messagereceived.ticket);
LOG.debug("RECEIVE ROLES << " + messagereceived.roles);
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("RECEIVE MSG = " + messagereceived);
+ }
+
String ticket =
TicketContainer.instance.getTicket(messagereceived.principal);
if (ticket != null && !ticket.equals(messagereceived.ticket))
throw new Exception("Invalid ticket " + messagereceived.ticket + " !=
" + ticket);
@@ -178,6 +187,9 @@ public class NotebookServer extends WebSocketServlet
implements
case ANGULAR_OBJECT_UPDATED:
angularObjectUpdated(conn, userAndRoles, notebook,
messagereceived);
break;
+ case ANGULAR_OBJECT_CLIENT_BIND:
+ angularObjectClientBind(conn, userAndRoles, notebook,
messagereceived);
+ break;
case LIST_CONFIGURATIONS:
sendAllConfigurations(conn, userAndRoles, notebook);
break;
@@ -205,7 +217,7 @@ public class NotebookServer extends WebSocketServlet
implements
return gson.fromJson(msg, Message.class);
}
- private String serializeMessage(Message m) {
+ protected String serializeMessage(Message m) {
return gson.toJson(m);
}
@@ -716,6 +728,91 @@ public class NotebookServer extends WebSocketServlet
implements
}
}
+ /**
+ * Push the given Angular variable to the target
+ * interpreter angular registry given a noteId
+ * and a paragraph id
+ * @param conn
+ * @param notebook
+ * @param fromMessage
+ * @throws Exception
+ */
+ protected void angularObjectClientBind(NotebookSocket conn, HashSet<String>
userAndRoles,
+ Notebook notebook, Message
fromMessage)
+ throws Exception {
+ String noteId = fromMessage.getType("noteId");
+ String varName = fromMessage.getType("name");
+ Object varValue = fromMessage.get("value");
+ String paragraphId = fromMessage.getType("paragraphId");
+ Note note = notebook.getNote(noteId);
+
+ if (paragraphId == null) {
+ throw new IllegalArgumentException("target paragraph not specified for "
+
+ "angular value bind");
+ }
+
+ if (note != null) {
+ final InterpreterGroup interpreterGroup =
findInterpreterGroupForParagraph(note,
+ paragraphId);
+
+ final AngularObjectRegistry registry =
interpreterGroup.getAngularObjectRegistry();
+ if (registry instanceof RemoteAngularObjectRegistry) {
+
+ RemoteAngularObjectRegistry remoteRegistry =
(RemoteAngularObjectRegistry) registry;
+ pushAngularObjectToRemoteRegistry(noteId, paragraphId, varName,
varValue, remoteRegistry,
+ interpreterGroup.getId(), conn);
+
+ } else {
+ pushAngularObjectToLocalRepo(noteId, paragraphId, varName, varValue,
registry,
+ interpreterGroup.getId(), conn);
+ }
+ }
+ }
+
+ private InterpreterGroup findInterpreterGroupForParagraph(Note note, String
paragraphId)
+ throws Exception {
+ final Paragraph paragraph = note.getParagraph(paragraphId);
+ if (paragraph == null) {
+ throw new IllegalArgumentException("Unknown paragraph with id : " +
paragraphId);
+ }
+ return paragraph.getCurrentRepl().getInterpreterGroup();
+ }
+
+ private void pushAngularObjectToRemoteRegistry(String noteId, String
paragraphId,
+ String varName, Object varValue, RemoteAngularObjectRegistry
remoteRegistry,
+ String interpreterGroupId, NotebookSocket conn) {
+
+ final AngularObject ao = remoteRegistry.addAndNotifyRemoteProcess(varName,
varValue,
+ noteId, paragraphId);
+
+ this.broadcastExcept(
+ noteId,
+ new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao)
+ .put("interpreterGroupId", interpreterGroupId)
+ .put("noteId", noteId)
+ .put("paragraphId", paragraphId),
+ conn);
+ }
+
+ private void pushAngularObjectToLocalRepo(String noteId, String paragraphId,
String varName,
+ Object varValue, AngularObjectRegistry registry,
+ String interpreterGroupId, NotebookSocket conn) {
+ AngularObject angularObject = registry.get(varName, noteId, paragraphId);
+ if (angularObject == null) {
+ angularObject = registry.add(varName, varValue, noteId, paragraphId);
+ } else {
+ angularObject.set(varValue, true);
+ }
+
+ this.broadcastExcept(
+ noteId,
+ new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject",
angularObject)
+ .put("interpreterGroupId", interpreterGroupId)
+ .put("noteId", noteId)
+ .put("paragraphId", paragraphId),
+ conn);
+ }
+
private void moveParagraph(NotebookSocket conn, HashSet<String>
userAndRoles, Notebook notebook,
Message fromMessage) throws IOException {
final String paragraphId = (String) fromMessage.get("id");
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/554e78c8/zeppelin-server/src/test/java/org/apache/zeppelin/display/AngularObjectBuilder.java
----------------------------------------------------------------------
diff --git
a/zeppelin-server/src/test/java/org/apache/zeppelin/display/AngularObjectBuilder.java
b/zeppelin-server/src/test/java/org/apache/zeppelin/display/AngularObjectBuilder.java
new file mode 100644
index 0000000..a6aaae6
--- /dev/null
+++
b/zeppelin-server/src/test/java/org/apache/zeppelin/display/AngularObjectBuilder.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.display;
+
+public class AngularObjectBuilder {
+
+ public static <T> AngularObject<T> build(String varName, T value, String
noteId,
+ String paragraphId) {
+ return new AngularObject<>(varName, value, noteId, paragraphId, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/554e78c8/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
----------------------------------------------------------------------
diff --git
a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
index 15d8826..6989c16 100644
---
a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
+++
b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
@@ -20,8 +20,13 @@
package org.apache.zeppelin.socket;
import com.google.gson.Gson;
+
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectBuilder;
+import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterSetting;
+import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.Paragraph;
@@ -36,8 +41,10 @@ import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.HashSet;
import java.util.List;
+import static java.util.Arrays.asList;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
@@ -157,6 +164,104 @@ public class NotebookServerTest extends
AbstractTestRestApi {
notebook.removeNote(note.getId());
}
+ @Test
+ public void should_bind_angular_object_to_remote_for_paragraphs() throws
Exception {
+ //Given
+ final String varName = "name";
+ final String value = "DuyHai DOAN";
+ final Message messageReceived = new Message(OP.ANGULAR_OBJECT_CLIENT_BIND)
+ .put("noteId", "noteId")
+ .put("name", varName)
+ .put("value", value)
+ .put("paragraphId", "paragraphId");
+
+ final NotebookServer server = new NotebookServer();
+ final Notebook notebook = mock(Notebook.class);
+ final Note note = mock(Note.class, RETURNS_DEEP_STUBS);
+
+ when(notebook.getNote("noteId")).thenReturn(note);
+ final Paragraph paragraph = mock(Paragraph.class, RETURNS_DEEP_STUBS);
+ when(note.getParagraph("paragraphId")).thenReturn(paragraph);
+
+
+ final RemoteAngularObjectRegistry mdRegistry =
mock(RemoteAngularObjectRegistry.class);
+ final InterpreterGroup mdGroup = new InterpreterGroup("mdGroup");
+ mdGroup.setAngularObjectRegistry(mdRegistry);
+
+ when(paragraph.getCurrentRepl().getInterpreterGroup()).thenReturn(mdGroup);
+
+
+ final AngularObject ao1 = AngularObjectBuilder.build(varName, value,
"noteId", "paragraphId");
+
+ when(mdRegistry.addAndNotifyRemoteProcess(varName, value, "noteId",
"paragraphId")).thenReturn(ao1);
+
+ NotebookSocket conn = mock(NotebookSocket.class);
+ NotebookSocket otherConn = mock(NotebookSocket.class);
+
+ final String mdMsg1 = server.serializeMessage(new
Message(OP.ANGULAR_OBJECT_UPDATE)
+ .put("angularObject", ao1)
+ .put("interpreterGroupId", "mdGroup")
+ .put("noteId", "noteId")
+ .put("paragraphId", "paragraphId"));
+
+ server.noteSocketMap.put("noteId", asList(conn, otherConn));
+
+ // When
+ server.angularObjectClientBind(conn, new HashSet<String>(), notebook,
messageReceived);
+
+ // Then
+ verify(mdRegistry, never()).addAndNotifyRemoteProcess(varName, value,
"noteId", null);
+
+ verify(otherConn).send(mdMsg1);
+ }
+
+ @Test
+ public void should_bind_angular_object_to_local_for_paragraphs() throws
Exception {
+ //Given
+ final String varName = "name";
+ final String value = "DuyHai DOAN";
+ final Message messageReceived = new Message(OP.ANGULAR_OBJECT_CLIENT_BIND)
+ .put("noteId", "noteId")
+ .put("name", varName)
+ .put("value", value)
+ .put("paragraphId", "paragraphId");
+
+ final NotebookServer server = new NotebookServer();
+ final Notebook notebook = mock(Notebook.class);
+ final Note note = mock(Note.class, RETURNS_DEEP_STUBS);
+ when(notebook.getNote("noteId")).thenReturn(note);
+ final Paragraph paragraph = mock(Paragraph.class, RETURNS_DEEP_STUBS);
+ when(note.getParagraph("paragraphId")).thenReturn(paragraph);
+
+ final AngularObjectRegistry mdRegistry = mock(AngularObjectRegistry.class);
+ final InterpreterGroup mdGroup = new InterpreterGroup("mdGroup");
+ mdGroup.setAngularObjectRegistry(mdRegistry);
+
+ when(paragraph.getCurrentRepl().getInterpreterGroup()).thenReturn(mdGroup);
+
+
+ final AngularObject ao1 = AngularObjectBuilder.build(varName, value,
"noteId", "paragraphId");
+
+ when(mdRegistry.add(varName, value, "noteId",
"paragraphId")).thenReturn(ao1);
+
+ NotebookSocket conn = mock(NotebookSocket.class);
+ NotebookSocket otherConn = mock(NotebookSocket.class);
+
+ final String mdMsg1 = server.serializeMessage(new
Message(OP.ANGULAR_OBJECT_UPDATE)
+ .put("angularObject", ao1)
+ .put("interpreterGroupId", "mdGroup")
+ .put("noteId", "noteId")
+ .put("paragraphId", "paragraphId"));
+
+ server.noteSocketMap.put("noteId", asList(conn, otherConn));
+
+ // When
+ server.angularObjectClientBind(conn, new HashSet<String>(), notebook,
messageReceived);
+
+ // Then
+ verify(otherConn).send(mdMsg1);
+ }
+
private NotebookSocket createWebSocket() {
NotebookSocket sock = mock(NotebookSocket.class);
when(sock.getRequest()).thenReturn(createHttpServletRequest());
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/554e78c8/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
----------------------------------------------------------------------
diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
index cb7265c..097ee84 100644
--- a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
+++ b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
@@ -23,11 +23,22 @@ angular.module('zeppelinWebApp')
$scope.editor = null;
var paragraphScope = $rootScope.$new(true, $rootScope);
+
// to keep backward compatibility
$scope.compiledScope = paragraphScope;
- var angularObjectRegistry = {};
+ paragraphScope.z = {
+ // Example: z.angularBind('my_var', 'Test Value',
'20150213-231621_168813393')
+ angularBind: function(varName, value, paragraphId) {
+ // Only push to server if there paragraphId is defined
+ if (paragraphId) {
+ websocketMsgSrv.clientBindAngularObject($routeParams.noteId, varName,
value, paragraphId);
+ }
+ }
+ };
+
+ var angularObjectRegistry = {};
var editorModes = {
'ace/mode/scala': /^%spark/,
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/554e78c8/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js
----------------------------------------------------------------------
diff --git
a/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js
b/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js
index 245ab77..3fba0f5 100644
--- a/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js
+++ b/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js
@@ -70,6 +70,18 @@ angular.module('zeppelinWebApp').service('websocketMsgSrv',
function($rootScope,
});
},
+ clientBindAngularObject: function(noteId, name, value, paragraphId) {
+ websocketEvents.sendNewEvent({
+ op: 'ANGULAR_OBJECT_CLIENT_BIND',
+ data: {
+ noteId: noteId,
+ name: name,
+ value: value,
+ paragraphId: paragraphId
+ }
+ });
+ },
+
cancelParagraphRun: function(paragraphId) {
websocketEvents.sendNewEvent({op: 'CANCEL_PARAGRAPH', data: {id:
paragraphId}});
},
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/554e78c8/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
----------------------------------------------------------------------
diff --git
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index 09c9026..bb4d69b 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -35,6 +35,8 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.*;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Paragraph is a representation of an execution unit.
*
@@ -52,6 +54,13 @@ public class Paragraph extends Job implements Serializable,
Cloneable {
private Map<String, Object> config; // paragraph configs like isOpen,
colWidth, etc
public final GUI settings; // form and parameter settings
+ @VisibleForTesting
+ Paragraph() {
+ super(generateId(), null);
+ config = new HashMap<>();
+ settings = new GUI();
+ }
+
public Paragraph(Note note, JobListener listener, NoteInterpreterLoader
replLoader) {
super(generateId(), listener);
this.note = note;
@@ -163,6 +172,10 @@ public class Paragraph extends Job implements
Serializable, Cloneable {
return replLoader.get(name);
}
+ public Interpreter getCurrentRepl() {
+ return getRepl(getRequiredReplName());
+ }
+
public List<String> completion(String buffer, int cursor) {
String replName = getRequiredReplName(buffer);
if (replName != null) {