Repository: incubator-zeppelin Updated Branches: refs/heads/master c0a7d08c5 -> 58b70e3bc
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java new file mode 100644 index 0000000..31b534e --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.scheduler; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * + */ +public class ExecutorFactory { + private static ExecutorFactory _executor; + private static Long _executorLock = new Long(0); + + Map<String, ExecutorService> executor = new HashMap<String, ExecutorService>(); + + public ExecutorFactory() { + + } + + public static ExecutorFactory singleton() { + if (_executor == null) { + synchronized (_executorLock) { + if (_executor == null) { + _executor = new ExecutorFactory(); + } + } + } + return _executor; + } + + public ExecutorService getDefaultExecutor() { + return createOrGet("default"); + } + + public ExecutorService createOrGet(String name) { + return createOrGet(name, 100); + } + + public ExecutorService createOrGet(String name, int numThread) { + synchronized (executor) { + if (!executor.containsKey(name)) { + executor.put(name, Executors.newScheduledThreadPool(numThread)); + } + return executor.get(name); + } + } + + public void shutdown(String name) { + synchronized (executor) { + if (executor.containsKey(name)) { + ExecutorService e = executor.get(name); + e.shutdown(); + executor.remove(name); + } + } + } + + + public void shutdownAll() { + synchronized (executor) { + for (String name : executor.keySet()){ + shutdown(name); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java index 2556a81..71769b4 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java @@ -22,8 +22,7 @@ import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ExecutorService; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; import org.slf4j.Logger; @@ -37,7 +36,7 @@ import org.slf4j.LoggerFactory; */ public class SchedulerFactory implements SchedulerListener { private final Logger logger = LoggerFactory.getLogger(SchedulerFactory.class); - ScheduledExecutorService executor; + ExecutorService executor; Map<String, Scheduler> schedulers = new LinkedHashMap<String, Scheduler>(); private static SchedulerFactory singleton; @@ -59,11 +58,11 @@ public class SchedulerFactory implements SchedulerListener { } public SchedulerFactory() throws Exception { - executor = Executors.newScheduledThreadPool(100); + executor = ExecutorFactory.singleton().createOrGet("schedulerFactory", 100); } public void destroy() { - executor.shutdown(); + ExecutorFactory.singleton().shutdown("schedulerFactory"); } public Scheduler createOrGetFIFOScheduler(String name) { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift index 051730e..f9cd181 100644 --- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift +++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift @@ -24,7 +24,8 @@ struct RemoteInterpreterContext { 2: string paragraphTitle, 3: string paragraphText, 4: string config, // json serialized config - 5: string gui // json serialized gui + 5: string gui, // json serialized gui + 6: string runners // json serialized runner } struct RemoteInterpreterResult { @@ -35,6 +36,19 @@ struct RemoteInterpreterResult { 5: string gui // json serialized gui } +enum RemoteInterpreterEventType { + NO_OP = 1, + ANGULAR_OBJECT_ADD = 2, + ANGULAR_OBJECT_UPDATE = 3, + ANGULAR_OBJECT_REMOVE = 4, + RUN_INTERPRETER_CONTEXT_RUNNER = 5 +} + +struct RemoteInterpreterEvent { + 1: RemoteInterpreterEventType type, + 2: string data // json serialized data +} + service RemoteInterpreterService { void createInterpreter(1: string className, 2: map<string, string> properties); @@ -48,4 +62,7 @@ service RemoteInterpreterService { void shutdown(); string getStatus(1:string jobId); + + RemoteInterpreterEvent getEvent(); + void angularObjectUpdate(1: string name, 2: string object); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectRegistryTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectRegistryTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectRegistryTest.java new file mode 100644 index 0000000..b0ed45f --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectRegistryTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.display; + +import static org.junit.Assert.assertEquals; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +public class AngularObjectRegistryTest { + + @Test + public void testBasic() { + final AtomicInteger onAdd = new AtomicInteger(0); + final AtomicInteger onUpdate = new AtomicInteger(0); + final AtomicInteger onRemove = new AtomicInteger(0); + + AngularObjectRegistry registry = new AngularObjectRegistry("intpId", + new AngularObjectRegistryListener() { + + @Override + public void onAdd(String interpreterGroupId, AngularObject object) { + onAdd.incrementAndGet(); + } + + @Override + public void onUpdate(String interpreterGroupId, AngularObject object) { + onUpdate.incrementAndGet(); + } + + @Override + public void onRemove(String interpreterGroupId, AngularObject object) { + onRemove.incrementAndGet(); + } + }); + + registry.add("name1", "value1"); + assertEquals(1, registry.getAll().size()); + assertEquals(1, onAdd.get()); + assertEquals(0, onUpdate.get()); + + registry.get("name1").set("newValue"); + assertEquals(1, onUpdate.get()); + + registry.remove("name1"); + assertEquals(0, registry.getAll().size()); + assertEquals(1, onRemove.get()); + + assertEquals(null, registry.get("name1")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectTest.java new file mode 100644 index 0000000..7ccc934 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.display; + +import static org.junit.Assert.assertEquals; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.junit.Test; + +public class AngularObjectTest { + + @Test + public void testListener() { + final AtomicInteger updated = new AtomicInteger(0); + AngularObject ao = new AngularObject("name", "value", new AngularObjectListener() { + + @Override + public void updated(AngularObject updatedObject) { + updated.incrementAndGet(); + } + + }); + + assertEquals(0, updated.get()); + ao.set("newValue"); + assertEquals(1, updated.get()); + assertEquals("newValue", ao.get()); + + ao.set("newValue"); + assertEquals(2, updated.get()); + + ao.set("newnewValue", false); + assertEquals(2, updated.get()); + assertEquals("newnewValue", ao.get()); + } + + @Test + public void testWatcher() throws InterruptedException { + final AtomicInteger updated = new AtomicInteger(0); + final AtomicInteger onWatch = new AtomicInteger(0); + AngularObject ao = new AngularObject("name", "value", new AngularObjectListener() { + @Override + public void updated(AngularObject updatedObject) { + updated.incrementAndGet(); + } + }); + + ao.addWatcher(new AngularObjectWatcher(null) { + @Override + public void watch(Object oldObject, Object newObject, InterpreterContext context) { + onWatch.incrementAndGet(); + } + }); + + assertEquals(0, onWatch.get()); + ao.set("newValue"); + + Thread.sleep(500); + assertEquals(1, onWatch.get()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java new file mode 100644 index 0000000..d4909e3 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.zeppelin.display.AngularObject; +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.AngularObjectRegistryListener; +import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterContextRunner; +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterAngular; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class RemoteAngularObjectTest implements AngularObjectRegistryListener { + private InterpreterGroup intpGroup; + private HashMap<String, String> env; + private RemoteInterpreter intp; + private InterpreterContext context; + private RemoteAngularObjectRegistry localRegistry; + + private AtomicInteger onAdd; + private AtomicInteger onUpdate; + private AtomicInteger onRemove; + + @Before + public void setUp() throws Exception { + onAdd = new AtomicInteger(0); + onUpdate = new AtomicInteger(0); + onRemove = new AtomicInteger(0); + + intpGroup = new InterpreterGroup("intpId"); + localRegistry = new RemoteAngularObjectRegistry("intpId", this, intpGroup); + intpGroup.setAngularObjectRegistry(localRegistry); + env = new HashMap<String, String>(); + env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); + + Properties p = new Properties(); + + intp = new RemoteInterpreter( + p, + MockInterpreterAngular.class.getName(), + new File("../bin/interpreter.sh").getAbsolutePath(), + "fake", + env + ); + + intpGroup.add(intp); + intp.setInterpreterGroup(intpGroup); + + context = new InterpreterContext( + "id", + "title", + "text", + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LinkedList<InterpreterContextRunner>()); + + intp.open(); + } + + @After + public void tearDown() throws Exception { + intp.close(); + intpGroup.clone(); + intpGroup.destroy(); + } + + @Test + public void testAngularObjectCRUD() throws InterruptedException { + InterpreterResult ret = intp.interpret("get", context); + Thread.sleep(500); // waitFor eventpoller pool event + String[] result = ret.message().split(" "); + assertEquals("0", result[0]); // size of registry + assertEquals("0", result[1]); // num watcher called + + // create object + ret = intp.interpret("add n1 v1", context); + Thread.sleep(500); + result = ret.message().split(" "); + assertEquals("1", result[0]); // size of registry + assertEquals("0", result[1]); // num watcher called + assertEquals("v1", localRegistry.get("n1").get()); + + // update object + ret = intp.interpret("update n1 v11", context); + result = ret.message().split(" "); + Thread.sleep(500); + assertEquals("1", result[0]); // size of registry + assertEquals("1", result[1]); // num watcher called + assertEquals("v11", localRegistry.get("n1").get()); + + // remove object + ret = intp.interpret("remove n1", context); + result = ret.message().split(" "); + Thread.sleep(500); + assertEquals("0", result[0]); // size of registry + assertEquals("1", result[1]); // num watcher called + assertEquals(null, localRegistry.get("n1")); + } + + @Override + public void onAdd(String interpreterGroupId, AngularObject object) { + onAdd.incrementAndGet(); + } + + @Override + public void onUpdate(String interpreterGroupId, AngularObject object) { + onUpdate.incrementAndGet(); + } + + @Override + public void onRemove(String interpreterGroupId, AngularObject object) { + onRemove.incrementAndGet(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java index 02dc224..fcd6847 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java @@ -22,7 +22,7 @@ import static org.junit.Assert.assertFalse; import java.util.HashMap; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; +import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; import org.junit.Test; @@ -30,11 +30,13 @@ public class RemoteInterpreterProcessTest { @Test public void testStartStop() { - RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>()); + InterpreterGroup intpGroup = new InterpreterGroup(); + RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>(), + new InterpreterContextRunnerPool()); assertFalse(rip.isRunning()); assertEquals(0, rip.referenceCount()); - assertEquals(1, rip.reference()); - assertEquals(2, rip.reference()); + assertEquals(1, rip.reference(intpGroup)); + assertEquals(2, rip.reference(intpGroup)); assertEquals(true, rip.isRunning()); assertEquals(1, rip.dereference()); assertEquals(true, rip.isRunning()); @@ -44,8 +46,10 @@ public class RemoteInterpreterProcessTest { @Test public void testClientFactory() throws Exception { - RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>()); - rip.reference(); + InterpreterGroup intpGroup = new InterpreterGroup(); + RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>(), + new InterpreterContextRunnerPool()); + rip.reference(intpGroup); assertEquals(0, rip.getNumActiveClient()); assertEquals(0, rip.getNumIdleClient()); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java index 58299bb..e743eab 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java @@ -30,12 +30,12 @@ import java.util.Map; import java.util.Properties; import org.apache.thrift.transport.TTransportException; +import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA; import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterB; import org.apache.zeppelin.scheduler.Job; @@ -109,7 +109,9 @@ public class RemoteInterpreterTest { "title", "text", new HashMap<String, Object>(), - new GUI())); + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LinkedList<InterpreterContextRunner>())); intpB.open(); assertEquals(2, process.referenceCount()); @@ -159,7 +161,9 @@ public class RemoteInterpreterTest { "title", "text", new HashMap<String, Object>(), - new GUI())); + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LinkedList<InterpreterContextRunner>())); assertEquals("500", ret.message()); ret = intpB.interpret("500", @@ -168,7 +172,9 @@ public class RemoteInterpreterTest { "title", "text", new HashMap<String, Object>(), - new GUI())); + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LinkedList<InterpreterContextRunner>())); assertEquals("1000", ret.message()); long end = System.currentTimeMillis(); assertTrue(end - start >= 1000); @@ -231,7 +237,9 @@ public class RemoteInterpreterTest { "title", "text", new HashMap<String, Object>(), - new GUI())); + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LinkedList<InterpreterContextRunner>())); } @Override @@ -262,7 +270,9 @@ public class RemoteInterpreterTest { "title", "text", new HashMap<String, Object>(), - new GUI())); + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LinkedList<InterpreterContextRunner>())); } @Override @@ -333,7 +343,9 @@ public class RemoteInterpreterTest { "title", "text", new HashMap<String, Object>(), - new GUI())); + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LinkedList<InterpreterContextRunner>())); synchronized (results) { results.add(ret.message()); @@ -413,7 +425,9 @@ public class RemoteInterpreterTest { "title", "text", new HashMap<String, Object>(), - new GUI())); + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LinkedList<InterpreterContextRunner>())); synchronized (results) { results.add(ret.message()); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java new file mode 100644 index 0000000..ff1b8ed --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote.mock; + +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.AngularObjectWatcher; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; + +public class MockInterpreterAngular extends Interpreter { + static { + Interpreter.register( + "angularTest", + "angular", + MockInterpreterA.class.getName(), + new InterpreterPropertyBuilder() + .add("p1", "v1", "property1").build()); + + } + + AtomicInteger numWatch = new AtomicInteger(0); + + public MockInterpreterAngular(Properties property) { + super(property); + } + + @Override + public void open() { + } + + @Override + public void close() { + + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + String[] stmt = st.split(" "); + String cmd = stmt[0]; + String name = null; + if (stmt.length >= 2) { + name = stmt[1]; + } + String value = null; + if (stmt.length == 3) { + value = stmt[2]; + } + + AngularObjectRegistry registry = context.getAngularObjectRegistry(); + + if (cmd.equals("add")) { + registry.add(name, value); + registry.get(name).addWatcher(new AngularObjectWatcher(null) { + + @Override + public void watch(Object oldObject, Object newObject, + InterpreterContext context) { + numWatch.incrementAndGet(); + } + + }); + } else if (cmd.equalsIgnoreCase("update")) { + registry.get(name).set(value); + } else if (cmd.equals("remove")) { + registry.remove(name); + } + + try { + Thread.sleep(500); // wait for watcher executed + } catch (InterruptedException e) { + } + + String msg = registry.getAll().size() + " " + Integer.toString(numWatch.get()); + return new InterpreterResult(Code.SUCCESS, msg); + } + + @Override + public void cancel(InterpreterContext context) { + } + + @Override + public FormType getFormType() { + return FormType.NATIVE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public List<String> completion(String buf, int cursor) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java index 2c13ab2..bb0fb80 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java @@ -21,17 +21,17 @@ import static org.junit.Assert.assertEquals; import java.io.File; import java.util.HashMap; +import java.util.LinkedList; import java.util.Map; import java.util.Properties; +import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA; -import org.apache.zeppelin.scheduler.Job; -import org.apache.zeppelin.scheduler.Scheduler; -import org.apache.zeppelin.scheduler.SchedulerFactory; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -53,7 +53,7 @@ public class RemoteSchedulerTest { @Test public void test() throws Exception { Properties p = new Properties(); - InterpreterGroup intpGroup = new InterpreterGroup(); + final InterpreterGroup intpGroup = new InterpreterGroup(); Map<String, String> env = new HashMap<String, String>(); env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); @@ -93,7 +93,9 @@ public class RemoteSchedulerTest { "title", "text", new HashMap<String, Object>(), - new GUI())); + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LinkedList<InterpreterContextRunner>())); return "1000"; } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index 909cc8f..bd55b2d 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -298,7 +298,7 @@ public class ZeppelinServer extends Application { this.schedulerFactory = new SchedulerFactory(); - this.replFactory = new InterpreterFactory(conf); + this.replFactory = new InterpreterFactory(conf, notebookServer); notebook = new Notebook(conf, schedulerFactory, replFactory, notebookServer); } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java index a7b8b66..e4626bf 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java @@ -90,6 +90,11 @@ public class Message { // @param notes serialized List<NoteInfo> object PARAGRAPH_REMOVE, + + ANGULAR_OBJECT_UPDATE, // [s-c] add/update angular object + ANGULAR_OBJECT_REMOVE, // [s-c] add angular object del + + ANGULAR_OBJECT_UPDATED // [c-s] angular object value updated } public OP op; http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 415e8c1..69d62d8 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -25,7 +25,11 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.zeppelin.display.AngularObject; +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.AngularObjectRegistryListener; import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.notebook.JobListenerFactory; import org.apache.zeppelin.notebook.Note; import org.apache.zeppelin.notebook.Notebook; @@ -50,7 +54,8 @@ import com.google.gson.Gson; * * @author anthonycorbacho */ -public class NotebookServer extends WebSocketServer implements JobListenerFactory { +public class NotebookServer extends WebSocketServer implements + JobListenerFactory, AngularObjectRegistryListener { private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class); private static final String DEFAULT_ADDR = "0.0.0.0"; @@ -131,6 +136,9 @@ public class NotebookServer extends WebSocketServer implements JobListenerFactor case COMPLETION: completion(conn, notebook, messagereceived); break; + case ANGULAR_OBJECT_UPDATED: + angularObjectUpdated(conn, notebook, messagereceived); + break; default: broadcastNoteList(); break; @@ -220,13 +228,28 @@ public class NotebookServer extends WebSocketServer implements JobListenerFactor return id; } + private void broadcastToNoteBindedInterpreter(String interpreterGroupId, Message m) { + Notebook notebook = notebook(); + List<Note> notes = notebook.getAllNotes(); + for (Note note : notes) { + List<String> ids = note.getNoteReplLoader().getInterpreters(); + for (String id : ids) { + if (id.equals(interpreterGroupId)) { + broadcast(note.id(), m); + } + } + } + } + private void broadcast(String noteId, Message m) { - LOG.info("SEND >> " + m.op); synchronized (noteSocketMap) { List<WebSocket> socketLists = noteSocketMap.get(noteId); if (socketLists == null || socketLists.size() == 0) { return; } + + LOG.info("SEND >> " + m.op); + for (WebSocket conn : socketLists) { conn.send(serializeMessage(m)); } @@ -264,9 +287,11 @@ public class NotebookServer extends WebSocketServer implements JobListenerFactor return; } Note note = notebook.getNote(noteId); + if (note != null) { addConnectionToNote(note.id(), conn); conn.send(serializeMessage(new Message(OP.NOTE).put("note", note))); + sendAllAngularObjects(note, conn); } } @@ -381,6 +406,66 @@ public class NotebookServer extends WebSocketServer implements JobListenerFactor conn.send(serializeMessage(resp)); } + /** + * When angular object updated from client + * @param conn + * @param notebook + * @param fromMessage + */ + private void angularObjectUpdated(WebSocket conn, Notebook notebook, + Message fromMessage) { + String noteId = (String) fromMessage.get("noteId"); + String interpreterGroupId = (String) fromMessage.get("interpreterGroupId"); + String varName = (String) fromMessage.get("name"); + Object varValue = fromMessage.get("value"); + + // propagate change to (Remote) AngularObjectRegistry + Note note = notebook.getNote(noteId); + if (note != null) { + List<InterpreterSetting> settings = note.getNoteReplLoader().getInterpreterSettings(); + for (InterpreterSetting setting : settings) { + if (setting.getInterpreterGroup() == null) { + continue; + } + + if (interpreterGroupId.equals(setting.getInterpreterGroup().getId())) { + AngularObjectRegistry angularObjectRegistry = setting + .getInterpreterGroup().getAngularObjectRegistry(); + AngularObject ao = angularObjectRegistry.get(varName); + if (ao == null) { + LOG.warn("Object {} is not binded", varName); + } else { + // path from client -> server + ao.set(varValue, false); + } + + break; + } + } + } + + // broadcast change to all web session that uses related interpreter. + for (Note n : notebook.getAllNotes()) { + List<InterpreterSetting> settings = note.getNoteReplLoader().getInterpreterSettings(); + for (InterpreterSetting setting : settings) { + if (setting.getInterpreterGroup() == null) { + continue; + } + + if (interpreterGroupId.equals(setting.getInterpreterGroup().getId())) { + AngularObjectRegistry angularObjectRegistry = setting + .getInterpreterGroup().getAngularObjectRegistry(); + AngularObject ao = angularObjectRegistry.get(varName); + this.broadcast(n.id(), new Message(OP.ANGULAR_OBJECT_UPDATE) + .put("angularObject", ao) + .put("interpreterGroupId", interpreterGroupId) + .put("noteId", n.id())); + } + } + } + } + + private void moveParagraph(WebSocket conn, Notebook notebook, Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); @@ -498,4 +583,66 @@ public class NotebookServer extends WebSocketServer implements JobListenerFactor public JobListener getParagraphJobListener(Note note) { return new ParagraphJobListener(this, note); } + + private void sendAllAngularObjects(Note note, WebSocket conn) { + List<InterpreterSetting> settings = note.getNoteReplLoader().getInterpreterSettings(); + if (settings == null || settings.size() == 0) { + return; + } + + for (InterpreterSetting intpSetting : settings) { + AngularObjectRegistry registry = intpSetting.getInterpreterGroup().getAngularObjectRegistry(); + List<AngularObject> objects = registry.getAll(); + for (AngularObject object : objects) { + conn.send(serializeMessage(new Message(OP.ANGULAR_OBJECT_UPDATE) + .put("angularObject", object) + .put("interpreterGroupId", intpSetting.getInterpreterGroup().getId()) + .put("noteId", note.id()))); + } + } + } + + @Override + public void onAdd(String interpreterGroupId, AngularObject object) { + onUpdate(interpreterGroupId, object); + } + + @Override + public void onUpdate(String interpreterGroupId, AngularObject object) { + Notebook notebook = notebook(); + + List<Note> notes = notebook.getAllNotes(); + for (Note note : notes) { + List<InterpreterSetting> intpSettings = note.getNoteReplLoader() + .getInterpreterSettings(); + + if (intpSettings.isEmpty()) continue; + + for (InterpreterSetting setting : intpSettings) { + if (setting.getInterpreterGroup().getId().equals(interpreterGroupId)) { + broadcast(note.id(), new Message(OP.ANGULAR_OBJECT_UPDATE) + .put("angularObject", object) + .put("interpreterGroupId", interpreterGroupId) + .put("noteId", note.id())); + } + } + } + } + + @Override + public void onRemove(String interpreterGroupId, AngularObject object) { + Notebook notebook = notebook(); + List<Note> notes = notebook.getAllNotes(); + for (Note note : notes) { + List<String> ids = note.getNoteReplLoader().getInterpreters(); + for (String id : ids) { + if (id.equals(interpreterGroupId)) { + broadcast( + note.id(), + new Message(OP.ANGULAR_OBJECT_REMOVE).put("name", + object.getName())); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java index 3e63503..a3bf289 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java @@ -79,7 +79,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { assertThat(get, isAllowed()); Map<String, Object> resp = gson.fromJson(get.getResponseBodyAsString(), new TypeToken<Map<String, Object>>(){}.getType()); Map<String, Object> body = (Map<String, Object>) resp.get("body"); - assertEquals(6, body.size()); + assertEquals(7, body.size()); get.releaseConnection(); } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-web/app/scripts/controllers/main.js ---------------------------------------------------------------------- diff --git a/zeppelin-web/app/scripts/controllers/main.js b/zeppelin-web/app/scripts/controllers/main.js index 535cf78..4948b49 100644 --- a/zeppelin-web/app/scripts/controllers/main.js +++ b/zeppelin-web/app/scripts/controllers/main.js @@ -24,7 +24,7 @@ */ angular.module('zeppelinWebApp') .controller('MainCtrl', function($scope, WebSocket, $rootScope, $window) { - + $rootScope.compiledScope = $scope.$new(true, $rootScope); $scope.WebSocketWaitingList = []; $scope.connected = false; $scope.looknfeel = 'default'; @@ -65,6 +65,8 @@ angular.module('zeppelinWebApp') $scope.$broadcast('updateProgress', data); } else if (op === 'COMPLETION_LIST') { $scope.$broadcast('completionList', data); + } else if (op === 'ANGULAR_OBJECT_UPDATE') { + $scope.$broadcast('angularObjectUpdate', data); } }); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-web/app/scripts/controllers/notebook.js ---------------------------------------------------------------------- diff --git a/zeppelin-web/app/scripts/controllers/notebook.js b/zeppelin-web/app/scripts/controllers/notebook.js index cc295dc..9a9fcd7 100644 --- a/zeppelin-web/app/scripts/controllers/notebook.js +++ b/zeppelin-web/app/scripts/controllers/notebook.js @@ -44,6 +44,8 @@ angular.module('zeppelinWebApp').controller('NotebookCtrl', function($scope, $ro $scope.interpreterSettings = []; $scope.interpreterBindings = []; + var angularObjectRegistry = {}; + $scope.getCronOptionNameFromValue = function(value) { if (!value) { return ''; @@ -442,4 +444,51 @@ angular.module('zeppelinWebApp').controller('NotebookCtrl', function($scope, $ro return true; } }; + + $scope.$on('angularObjectUpdate', function(event, data) { + if (data.noteId === $scope.note.id) { + var scope = $rootScope.compiledScope; + var varName = data.angularObject.name; + + if (angular.equals(data.angularObject.object, scope[varName])) { + // return when update has no change + return; + } + + if (!angularObjectRegistry[varName]) { + angularObjectRegistry[varName] = { + interpreterGroupId : data.interpreterGroupId, + } + } + + angularObjectRegistry[varName].skipEmit = true; + + if (!angularObjectRegistry[varName].clearWatcher) { + angularObjectRegistry[varName].clearWatcher = scope.$watch(varName, function(newValue, oldValue) { + if (angularObjectRegistry[varName].skipEmit) { + angularObjectRegistry[varName].skipEmit = false; + return; + } + + $rootScope.$emit('sendNewEvent', { + op: 'ANGULAR_OBJECT_UPDATED', + data: { + noteId: $routeParams.noteId, + name:varName, + value:newValue, + interpreterGroupId:angularObjectRegistry[varName].interpreterGroupId + } + }); + }); + } + scope[varName] = data.angularObject.object; + } + + }); + + var isFunction = function(functionToCheck) { + var getType = {}; + return functionToCheck && getType.toString.call(functionToCheck) === '[object Function]'; + } + }); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-web/app/scripts/controllers/paragraph.js ---------------------------------------------------------------------- diff --git a/zeppelin-web/app/scripts/controllers/paragraph.js b/zeppelin-web/app/scripts/controllers/paragraph.js index aac8a78..3b1d05f 100644 --- a/zeppelin-web/app/scripts/controllers/paragraph.js +++ b/zeppelin-web/app/scripts/controllers/paragraph.js @@ -25,7 +25,7 @@ * @author anthonycorbacho */ angular.module('zeppelinWebApp') - .controller('ParagraphCtrl', function($scope, $rootScope, $route, $window, $element, $routeParams, $location, $timeout) { + .controller('ParagraphCtrl', function($scope, $rootScope, $route, $window, $element, $routeParams, $location, $timeout, $compile) { $scope.paragraph = null; $scope.editor = null; @@ -56,6 +56,8 @@ angular.module('zeppelinWebApp') $scope.setGraphMode($scope.getGraphMode(), false, false); } else if ($scope.getResultType() === 'HTML') { $scope.renderHtml(); + } else if ($scope.getResultType() === 'ANGULAR') { + $scope.renderAngular(); } }; @@ -77,6 +79,25 @@ angular.module('zeppelinWebApp') }; + $scope.renderAngular = function() { + var retryRenderer = function() { + if (angular.element('#p'+$scope.paragraph.id+'_angular').length) { + try { + angular.element('#p'+$scope.paragraph.id+'_angular').html($scope.paragraph.result.msg); + + $compile(angular.element('#p'+$scope.paragraph.id+'_angular').contents())($rootScope.compiledScope); + } catch(err) { + console.log('ANGULAR rendering error %o', err); + } + } else { + $timeout(retryRenderer,10); + } + }; + $timeout(retryRenderer); + + }; + + var initializeDefault = function() { var config = $scope.paragraph.config; @@ -210,6 +231,8 @@ angular.module('zeppelinWebApp') } } else if (newType === 'HTML') { $scope.renderHtml(); + } else if (newType === 'ANGULAR') { + $scope.renderAngular(); } } }); @@ -1593,5 +1616,4 @@ angular.module('zeppelinWebApp') var redirectToUrl = location.protocol + '//' + location.host + '/#/notebook/' + noteId + '/paragraph/' + $scope.paragraph.id+'?asIframe'; $window.open(redirectToUrl); }; - }); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-web/app/views/paragraph.html ---------------------------------------------------------------------- diff --git a/zeppelin-web/app/views/paragraph.html b/zeppelin-web/app/views/paragraph.html index c77c85a..ef4daaa 100644 --- a/zeppelin-web/app/views/paragraph.html +++ b/zeppelin-web/app/views/paragraph.html @@ -353,6 +353,11 @@ limitations under the License. ng-Init="loadResultType(paragraph.result)"> </div> + <div id="p{{paragraph.id}}_angular" + ng-if="paragraph.result.type == 'ANGULAR'" + ng-Init="loadResultType(paragraph.result)"> + </div> + <img id="{{paragraph.id}}_img" ng-if="paragraph.result.type == 'IMG'" ng-Init="loadResultType(paragraph.result)" http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 4b0d96c..c6c3b82 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -385,6 +385,7 @@ public class ZeppelinConfiguration extends XMLConfiguration { + "org.apache.zeppelin.spark.SparkSqlInterpreter," + "org.apache.zeppelin.spark.DepInterpreter," + "org.apache.zeppelin.markdown.Markdown," + + "org.apache.zeppelin.angular.AngularInterpreter," + "org.apache.zeppelin.shell.ShellInterpreter"), ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"), ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"), http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java index 7c81e90..c8fc485 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java @@ -43,7 +43,10 @@ import java.util.Set; import org.apache.commons.lang.ArrayUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.AngularObjectRegistryListener; import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter; +import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,15 +76,21 @@ public class InterpreterFactory { private InterpreterOption defaultOption; - public InterpreterFactory(ZeppelinConfiguration conf) throws InterpreterException, IOException { - this(conf, new InterpreterOption(true)); + AngularObjectRegistryListener angularObjectRegistryListener; + + public InterpreterFactory(ZeppelinConfiguration conf, + AngularObjectRegistryListener angularObjectRegistryListener) + throws InterpreterException, IOException { + this(conf, new InterpreterOption(true), angularObjectRegistryListener); } - public InterpreterFactory(ZeppelinConfiguration conf, InterpreterOption defaultOption) + public InterpreterFactory(ZeppelinConfiguration conf, InterpreterOption defaultOption, + AngularObjectRegistryListener angularObjectRegistryListener) throws InterpreterException, IOException { this.conf = conf; this.defaultOption = defaultOption; + this.angularObjectRegistryListener = angularObjectRegistryListener; String replsConf = conf.getString(ConfVars.ZEPPELIN_INTERPRETERS); interpreterClassList = replsConf.split(","); @@ -217,17 +226,20 @@ public class InterpreterFactory { // previously created setting should turn this feature on here. setting.getOption().setRemote(true); - InterpreterGroup interpreterGroup = createInterpreterGroup( - setting.getGroup(), - setting.getOption(), - setting.getProperties()); + InterpreterSetting intpSetting = new InterpreterSetting( setting.id(), setting.getName(), setting.getGroup(), + setting.getOption()); + + InterpreterGroup interpreterGroup = createInterpreterGroup( + setting.id(), + setting.getGroup(), setting.getOption(), - interpreterGroup); + setting.getProperties()); + intpSetting.setInterpreterGroup(interpreterGroup); interpreterSettings.put(k, intpSetting); } @@ -320,25 +332,46 @@ public class InterpreterFactory { InterpreterOption option, Properties properties) throws InterpreterException, IOException { synchronized (interpreterSettings) { - InterpreterGroup interpreterGroup = createInterpreterGroup(groupName, option, properties); InterpreterSetting intpSetting = new InterpreterSetting( name, groupName, - option, - interpreterGroup); - interpreterSettings.put(intpSetting.id(), intpSetting); + option); + + InterpreterGroup interpreterGroup = createInterpreterGroup( + intpSetting.id(), groupName, option, properties); + intpSetting.setInterpreterGroup(interpreterGroup); + + interpreterSettings.put(intpSetting.id(), intpSetting); saveToFile(); return interpreterGroup; } } - private InterpreterGroup createInterpreterGroup(String groupName, + private InterpreterGroup createInterpreterGroup(String id, + String groupName, InterpreterOption option, Properties properties) throws InterpreterException { - InterpreterGroup interpreterGroup = new InterpreterGroup(); + + AngularObjectRegistry angularObjectRegistry; + + InterpreterGroup interpreterGroup = new InterpreterGroup(id); + if (option.isRemote()) { + angularObjectRegistry = new RemoteAngularObjectRegistry( + id, + angularObjectRegistryListener, + interpreterGroup + ); + } else { + angularObjectRegistry = new AngularObjectRegistry( + id, + angularObjectRegistryListener); + } + + interpreterGroup.setAngularObjectRegistry(angularObjectRegistry); + for (String className : interpreterClassList) { Set<String> keys = Interpreter.registeredInterpreters.keySet(); @@ -480,6 +513,7 @@ public class InterpreterFactory { intpsetting.setOption(option); InterpreterGroup interpreterGroup = createInterpreterGroup( + intpsetting.id(), intpsetting.getGroup(), option, properties); intpsetting.setInterpreterGroup(interpreterGroup); saveToFile(); @@ -499,6 +533,7 @@ public class InterpreterFactory { intpsetting.getInterpreterGroup().destroy(); InterpreterGroup interpreterGroup = createInterpreterGroup( + intpsetting.id(), intpsetting.getGroup(), intpsetting.getOption(), intpsetting.getProperties()); intpsetting.setInterpreterGroup(interpreterGroup); } else { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java index 04785aa..301ed23 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java @@ -36,21 +36,17 @@ public class InterpreterSetting { public InterpreterSetting(String id, String name, String group, - InterpreterOption option, - InterpreterGroup interpreterGroup) { + InterpreterOption option) { this.id = id; this.name = name; this.group = group; - this.properties = interpreterGroup.getProperty(); this.option = option; - this.interpreterGroup = interpreterGroup; } public InterpreterSetting(String name, String group, - InterpreterOption option, - InterpreterGroup interpreterGroup) { - this(generateId(), name, group, option, interpreterGroup); + InterpreterOption option) { + this(generateId(), name, group, option); } public String id() { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index 9204a07..b5e68a4 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -28,17 +28,21 @@ import java.util.List; import java.util.Map; import java.util.Random; -import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; +import org.apache.zeppelin.display.AngularObject; +import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.notebook.utility.IdHashes; import org.apache.zeppelin.scheduler.Job; +import org.apache.zeppelin.scheduler.Job.Status; import org.apache.zeppelin.scheduler.JobListener; import org.apache.zeppelin.scheduler.Scheduler; -import org.apache.zeppelin.scheduler.Job.Status; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +58,8 @@ public class Note implements Serializable, JobListener { private String name; private String id; + Map<String, List<AngularObject>> angularObjects = new HashMap<String, List<AngularObject>>(); + private transient NoteInterpreterLoader replLoader; private transient ZeppelinConfiguration conf; private transient JobListenerFactory jobListenerFactory; @@ -110,13 +116,17 @@ public class Note implements Serializable, JobListener { this.conf = conf; } + public Map<String, List<AngularObject>> getAngularObjects() { + return angularObjects; + } + /** * Add paragraph last. * * @param p */ public Paragraph addParagraph() { - Paragraph p = new Paragraph(this, replLoader); + Paragraph p = new Paragraph(this, this, replLoader); synchronized (paragraphs) { paragraphs.add(p); } @@ -130,7 +140,7 @@ public class Note implements Serializable, JobListener { * @param p */ public Paragraph insertParagraph(int index) { - Paragraph p = new Paragraph(this, replLoader); + Paragraph p = new Paragraph(this, this, replLoader); synchronized (paragraphs) { paragraphs.add(index, p); } @@ -268,6 +278,21 @@ public class Note implements Serializable, JobListener { } } + private void snapshotAngularObjectRegistry() { + angularObjects = new HashMap<String, List<AngularObject>>(); + + List<InterpreterSetting> settings = replLoader.getInterpreterSettings(); + if (settings == null || settings.size() == 0) { + return; + } + + for (InterpreterSetting setting : settings) { + InterpreterGroup intpGroup = setting.getInterpreterGroup(); + AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry(); + angularObjects.put(intpGroup.getId(), registry.getAll()); + } + } + public void persist() throws IOException { GsonBuilder gsonBuilder = new GsonBuilder(); gsonBuilder.setPrettyPrinting(); @@ -283,6 +308,7 @@ public class Note implements Serializable, JobListener { File file = new File(conf.getNotebookDir() + "/" + id + "/note.json"); logger().info("Persist note {} into {}", id, file.getAbsolutePath()); + snapshotAngularObjectRegistry(); String json = gson.toJson(this); FileOutputStream out = new FileOutputStream(file); out.write(json.getBytes(conf.getString(ConfVars.ZEPPELIN_ENCODING))); @@ -316,11 +342,12 @@ public class Note implements Serializable, JobListener { note.setReplLoader(replLoader); note.jobListenerFactory = jobListenerFactory; for (Paragraph p : note.paragraphs) { + p.setNote(note); + if (p.getStatus() == Status.PENDING || p.getStatus() == Status.RUNNING) { p.setStatus(Status.ABORT); } } - return note; } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index 2d9ba36..844763f 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; @@ -29,7 +31,10 @@ import java.util.Map; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; +import org.apache.zeppelin.display.AngularObject; +import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.interpreter.InterpreterFactory; +import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.scheduler.Scheduler; import org.apache.zeppelin.scheduler.SchedulerFactory; @@ -160,6 +165,10 @@ public class Notebook { if (dirs == null) { return; } + + Map<String, SnapshotAngularObject> angularObjectSnapshot = + new HashMap<String, SnapshotAngularObject>(); + for (File f : dirs) { boolean isHidden = f.getName().startsWith("."); if (f.isDirectory() && !isHidden) { @@ -174,18 +183,84 @@ public class Notebook { jobListenerFactory, quartzSched); noteInterpreterLoader.setNoteId(note.id()); + // restore angular object -------------- + Date lastUpdatedDate = new Date(0); + for (Paragraph p : note.getParagraphs()) { + if (p.getDateFinished() != null && + lastUpdatedDate.before(p.getDateFinished())) { + lastUpdatedDate = p.getDateFinished(); + } + } + + Map<String, List<AngularObject>> savedObjects = note.getAngularObjects(); + + if (savedObjects != null) { + for (String intpGroupName : savedObjects.keySet()) { + List<AngularObject> objectList = savedObjects.get(intpGroupName); + + for (AngularObject savedObject : objectList) { + SnapshotAngularObject snapshot = angularObjectSnapshot.get(savedObject.getName()); + if (snapshot == null || snapshot.getLastUpdate().before(lastUpdatedDate)) { + angularObjectSnapshot.put( + savedObject.getName(), + new SnapshotAngularObject( + intpGroupName, + savedObject, + lastUpdatedDate)); + } + } + } + } + synchronized (notes) { notes.put(note.id(), note); refreshCron(note.id()); } } } + + for (String name : angularObjectSnapshot.keySet()) { + SnapshotAngularObject snapshot = angularObjectSnapshot.get(name); + List<InterpreterSetting> settings = replFactory.get(); + for (InterpreterSetting setting : settings) { + InterpreterGroup intpGroup = setting.getInterpreterGroup(); + if (intpGroup.getId().equals(snapshot.getIntpGroupId())) { + AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry(); + if (registry.get(name) == null) { + registry.add(name, snapshot.getAngularObject().get(), false); + } + } + } + } + } + + class SnapshotAngularObject { + String intpGroupId; + AngularObject angularObject; + Date lastUpdate; + + public SnapshotAngularObject(String intpGroupId, + AngularObject angularObject, Date lastUpdate) { + super(); + this.intpGroupId = intpGroupId; + this.angularObject = angularObject; + this.lastUpdate = lastUpdate; + } + + public String getIntpGroupId() { + return intpGroupId; + } + public AngularObject getAngularObject() { + return angularObject; + } + public Date getLastUpdate() { + return lastUpdate; + } } public List<Note> getAllNotes() { synchronized (notes) { List<Note> noteList = new ArrayList<Note>(notes.values()); - logger.info("" + noteList.size()); Collections.sort(noteList, new Comparator() { @Override public int compare(Object one, Object two) { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index e0986bf..79dfc3d 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -19,16 +19,20 @@ package org.apache.zeppelin.notebook; import java.io.Serializable; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Random; +import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.display.Input; import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.Interpreter.FormType; import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.Interpreter.FormType; +import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.JobListener; import org.slf4j.Logger; @@ -42,14 +46,16 @@ import org.slf4j.LoggerFactory; public class Paragraph extends Job implements Serializable { private static final transient long serialVersionUID = -6328572073497992016L; private transient NoteInterpreterLoader replLoader; + private transient Note note; String title; String text; private Map<String, Object> config; // paragraph configs like isOpen, colWidth, etc public final GUI settings; // form and parameter settings - public Paragraph(JobListener listener, NoteInterpreterLoader replLoader) { + public Paragraph(Note note, JobListener listener, NoteInterpreterLoader replLoader) { super(generateId(), listener); + this.note = note; this.replLoader = replLoader; title = null; text = null; @@ -79,6 +85,14 @@ public class Paragraph extends Job implements Serializable { this.title = title; } + public void setNote(Note note) { + this.note = note; + } + + public Note getNote() { + return note; + } + public String getRequiredReplName() { return getRequiredReplName(text); } @@ -207,14 +221,43 @@ public class Paragraph extends Job implements Serializable { } private InterpreterContext getInterpreterContext() { + AngularObjectRegistry registry = null; + + if (!getNoteReplLoader().getInterpreterSettings().isEmpty()) { + InterpreterSetting intpGroup = getNoteReplLoader().getInterpreterSettings().get(0); + registry = intpGroup.getInterpreterGroup().getAngularObjectRegistry(); + } + + List<InterpreterContextRunner> runners = new LinkedList<InterpreterContextRunner>(); + for (Paragraph p : note.getParagraphs()) { + runners.add(new ParagraphRunner(note, note.id(), p.getId())); + } + InterpreterContext interpreterContext = new InterpreterContext(getId(), this.getTitle(), this.getText(), this.getConfig(), - this.settings); + this.settings, + registry, + runners); return interpreterContext; } + static class ParagraphRunner extends InterpreterContextRunner { + private Note note; + + public ParagraphRunner(Note note, String noteId, String paragraphId) { + super(noteId, paragraphId); + this.note = note; + } + + @Override + public void run() { + note.run(getParagraphId()); + } + } + + private Logger logger() { Logger logger = LoggerFactory.getLogger(Paragraph.class); return logger; http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java index 5199300..63aef0d 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java @@ -54,8 +54,8 @@ public class InterpreterFactoryTest { System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), tmpDir.getAbsolutePath()); System.setProperty(ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2"); conf = new ZeppelinConfiguration(); - factory = new InterpreterFactory(conf, new InterpreterOption(false)); - context = new InterpreterContext("id", "title", "text", null, null); + factory = new InterpreterFactory(conf, new InterpreterOption(false), null); + context = new InterpreterContext("id", "title", "text", null, null, null, null); } @@ -122,7 +122,7 @@ public class InterpreterFactoryTest { factory.add("newsetting", "mock1", new InterpreterOption(false), new Properties()); assertEquals(3, factory.get().size()); - InterpreterFactory factory2 = new InterpreterFactory(conf); + InterpreterFactory factory2 = new InterpreterFactory(conf, null); assertEquals(3, factory2.get().size()); } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java index 88af541..8d2c65a 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java @@ -69,7 +69,7 @@ public class NotebookTest implements JobListenerFactory{ MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1"); MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2"); - factory = new InterpreterFactory(conf, new InterpreterOption(false)); + factory = new InterpreterFactory(conf, new InterpreterOption(false), null); notebook = new Notebook(conf, schedulerFactory, factory, this); } @@ -108,7 +108,7 @@ public class NotebookTest implements JobListenerFactory{ p1.setText("hello world"); note.persist(); - Notebook notebook2 = new Notebook(conf, schedulerFactory, new InterpreterFactory(conf), this); + Notebook notebook2 = new Notebook(conf, schedulerFactory, new InterpreterFactory(conf, null), this); assertEquals(1, notebook2.getAllNotes().size()); }
