Repository: incubator-zeppelin Updated Branches: refs/heads/master 218a3b5bc -> ddf2c89ec
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ByteBufferInputStream.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ByteBufferInputStream.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ByteBufferInputStream.java new file mode 100644 index 0000000..a8becb4 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ByteBufferInputStream.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.resource; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +/** + * InputStream from bytebuffer + */ +public class ByteBufferInputStream extends InputStream { + + ByteBuffer buf; + + public ByteBufferInputStream(ByteBuffer buf) { + this.buf = buf; + } + + public int read() throws IOException { + if (!buf.hasRemaining()) { + return -1; + } + return buf.get() & 0xFF; + } + + public int read(byte[] bytes, int off, int len) throws IOException { + if (!buf.hasRemaining()) { + return -1; + } + len = Math.min(len, buf.remaining()); + buf.get(bytes, off, len); + return len; + } + + public static InputStream get(ByteBuffer buf) { + if (buf.hasArray()) { + return new ByteArrayInputStream(buf.array()); + } else { + return new ByteBufferInputStream(buf); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java new file mode 100644 index 0000000..3f03b92 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.resource; + +/** + * distributed resource pool + */ +public class DistributedResourcePool extends LocalResourcePool { + + private final ResourcePoolConnector connector; + + public DistributedResourcePool(String id, ResourcePoolConnector connector) { + super(id); + this.connector = connector; + } + + @Override + public Resource get(String name) { + return get(name, true); + } + + /** + * get resource by name. + * @param name + * @param remote false only return from local resource + * @return null if resource not found. + */ + public Resource get(String name, boolean remote) { + // try local first + Resource resource = super.get(name); + if (resource != null) { + return resource; + } + + if (remote) { + ResourceSet resources = connector.getAllResources().filterByName(name); + if (resources.isEmpty()) { + return null; + } else { + return resources.get(0); + } + } else { + return null; + } + } + + @Override + public ResourceSet getAll() { + return getAll(true); + } + + /** + * Get all resource from the pool + * @param remote false only return local resource + * @return + */ + public ResourceSet getAll(boolean remote) { + ResourceSet all = super.getAll(); + if (remote) { + all.addAll(connector.getAllResources()); + } + return all; + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/LocalResourcePool.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/LocalResourcePool.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/LocalResourcePool.java new file mode 100644 index 0000000..cc5f7e9 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/LocalResourcePool.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.resource; + +import java.util.*; + +/** + * ResourcePool + */ +public class LocalResourcePool implements ResourcePool { + private final String resourcePoolId; + private final Map<ResourceId, Resource> resources = Collections.synchronizedMap( + new HashMap<ResourceId, Resource>()); + + /** + * @param id unique id + */ + public LocalResourcePool(String id) { + resourcePoolId = id; + } + + /** + * Get unique id of this resource pool + * @return + */ + @Override + public String id() { + return resourcePoolId; + } + + /** + * Get resource + * @return null if resource not found + */ + @Override + public Resource get(String name) { + ResourceId resourceId = new ResourceId(resourcePoolId, name); + return resources.get(resourceId); + } + + @Override + public ResourceSet getAll() { + return new ResourceSet(resources.values()); + } + + /** + * Put resource into the pull + * @param + * @param object object to put into the resource + */ + @Override + public void put(String name, Object object) { + ResourceId resourceId = new ResourceId(resourcePoolId, name); + + Resource resource = new Resource(resourceId, object); + resources.put(resourceId, resource); + } + + @Override + public Resource remove(String name) { + return resources.remove(new ResourceId(resourcePoolId, name)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteResource.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteResource.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteResource.java new file mode 100644 index 0000000..5a8a9ea --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteResource.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.resource; + +/** + * Resource that can retrieve data from remote + */ +public class RemoteResource extends Resource { + ResourcePoolConnector resourcePoolConnector; + + RemoteResource(ResourceId resourceId, Object r) { + super(resourceId, r); + } + + RemoteResource(ResourceId resourceId, boolean serializable, String className) { + super(resourceId, serializable, className); + } + + @Override + public Object get() { + if (isSerializable()) { + Object o = resourcePoolConnector.readResource(getResourceId()); + return o; + } else { + return null; + } + } + + @Override + public boolean isLocal() { + return false; + } + + public ResourcePoolConnector getResourcePoolConnector() { + return resourcePoolConnector; + } + + public void setResourcePoolConnector(ResourcePoolConnector resourcePoolConnector) { + this.resourcePoolConnector = resourcePoolConnector; + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java new file mode 100644 index 0000000..6988b3e --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.resource; + +import java.io.*; +import java.nio.ByteBuffer; + +/** + * Information and reference to the resource + */ +public class Resource { + private final transient Object r; + private final boolean serializable; + private final ResourceId resourceId; + private final String className; + + + /** + * Create local resource + * @param resourceId + * @param r must not be null + */ + Resource(ResourceId resourceId, Object r) { + this.r = r; + this.resourceId = resourceId; + this.serializable = r instanceof Serializable; + this.className = r.getClass().getName(); + } + + /** + * Create remote object + * @param resourceId + */ + Resource(ResourceId resourceId, boolean serializable, String className) { + this.r = null; + this.resourceId = resourceId; + this.serializable = serializable; + this.className = className; + } + + public ResourceId getResourceId() { + return resourceId; + } + + public String getClassName() { + return className; + } + + /** + * + * @return null when this is remote resource and not serializable. + */ + public Object get() { + if (isLocal() || isSerializable()){ + return r; + } else { + return null; + } + } + + public boolean isSerializable() { + return serializable; + } + + /** + * if it is remote object + * @return + */ + public boolean isRemote() { + return !isLocal(); + } + + /** + * Whether it is locally accessible or not + * @return + */ + public boolean isLocal() { + return true; + } + + + + public static ByteBuffer serializeObject(Object o) throws IOException { + if (o == null || !(o instanceof Serializable)) { + return null; + } + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try { + ObjectOutputStream oos; + oos = new ObjectOutputStream(out); + oos.writeObject(o); + oos.close(); + out.close(); + } catch (Exception e) { + e.printStackTrace(); + } + return ByteBuffer.wrap(out.toByteArray()); + } + + public static Object deserializeObject(ByteBuffer buf) + throws IOException, ClassNotFoundException { + if (buf == null) { + return null; + } + InputStream ins = ByteBufferInputStream.get(buf); + ObjectInputStream oin; + Object object = null; + + oin = new ObjectInputStream(ins); + object = oin.readObject(); + oin.close(); + ins.close(); + + return object; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceId.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceId.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceId.java new file mode 100644 index 0000000..a0d55e3 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceId.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.resource; + +/** + * Identifying resource + */ +public class ResourceId { + private final String resourcePoolId; + private final String name; + + ResourceId(String resourcePoolId, String name) { + this.resourcePoolId = resourcePoolId; + this.name = name; + } + + public String getResourcePoolId() { + return resourcePoolId; + } + + public String getName() { + return name; + } + + @Override + public int hashCode() { + return (resourcePoolId + name).hashCode(); + } + + @Override + public boolean equals(Object o) { + if (o instanceof ResourceId) { + ResourceId r = (ResourceId) o; + return (r.name.equals(name) && r.resourcePoolId.equals(resourcePoolId)); + } else { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePool.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePool.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePool.java new file mode 100644 index 0000000..6328b8d --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePool.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.resource; + +/** + * Interface for ResourcePool + */ +public interface ResourcePool { + /** + * Get unique id of the resource pool + * @return + */ + public String id(); + + /** + * Get resource from name + * @param name Resource name + * @return null if resource not found + */ + public Resource get(String name); + + /** + * Get all resources + * @return + */ + public ResourceSet getAll(); + + /** + * Put an object into resource pool + * @param name + * @param object + */ + public void put(String name, Object object); + + /** + * Remove object + * @param name Resource name to remove + * @return removed Resource. null if resource not found + */ + public Resource remove(String name); +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolConnector.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolConnector.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolConnector.java new file mode 100644 index 0000000..af343db --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolConnector.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.resource; + +/** + * Connect resource pools running in remote process + */ +public interface ResourcePoolConnector { + /** + * Get list of resources from all other resource pools in remote processes + * @return + */ + public ResourceSet getAllResources(); + + /** + * Read remote object + * @return + */ + public Object readResource(ResourceId id); +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSet.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSet.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSet.java new file mode 100644 index 0000000..a03655b --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSet.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.resource; + +import java.util.Collection; +import java.util.LinkedList; +import java.util.regex.Pattern; + +/** + * List of resources + */ +public class ResourceSet extends LinkedList<Resource> { + + public ResourceSet(Collection<Resource> resources) { + super(resources); + } + + public ResourceSet() { + super(); + } + + public ResourceSet filterByNameRegex(String regex) { + ResourceSet result = new ResourceSet(); + for (Resource r : this) { + if (Pattern.matches(regex, r.getResourceId().getName())) { + result.add(r); + } + } + return result; + } + + public ResourceSet filterByName(String name) { + ResourceSet result = new ResourceSet(); + for (Resource r : this) { + if (r.getResourceId().getName().equals(name)) { + result.add(r); + } + } + return result; + } + + public ResourceSet filterByClassnameRegex(String regex) { + ResourceSet result = new ResourceSet(); + for (Resource r : this) { + if (Pattern.matches(regex, r.getClassName())) { + result.add(r); + } + } + return result; + } + + public ResourceSet filterByClassname(String className) { + ResourceSet result = new ResourceSet(); + for (Resource r : this) { + if (r.getClassName().equals(className)) { + result.add(r); + } + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift index 5cd14a2..3d6a62e 100644 --- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift +++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift @@ -43,8 +43,10 @@ enum RemoteInterpreterEventType { ANGULAR_OBJECT_UPDATE = 3, ANGULAR_OBJECT_REMOVE = 4, RUN_INTERPRETER_CONTEXT_RUNNER = 5, - OUTPUT_APPEND = 6, - OUTPUT_UPDATE = 7 + RESOURCE_POOL_GET_ALL = 6, + RESOURCE_GET = 7 + OUTPUT_APPEND = 8, + OUTPUT_UPDATE = 9 } struct RemoteInterpreterEvent { @@ -53,7 +55,7 @@ struct RemoteInterpreterEvent { } service RemoteInterpreterService { - void createInterpreter(1: string className, 2: map<string, string> properties); + void createInterpreter(1: string intpGroupId, 2: string className, 3: map<string, string> properties); void open(1: string className); void close(1: string className); @@ -67,8 +69,18 @@ service RemoteInterpreterService { string getStatus(1:string jobId); RemoteInterpreterEvent getEvent(); + + // as a response, ZeppelinServer send list of resources to Interpreter process + void resourcePoolResponseGetAll(1: list<string> resources); + // as a response, ZeppelinServer send serialized value of resource + void resourceResponseGet(1: string resourceId, 2: binary object); + // get all resources in the interpreter process + list<string> resoucePoolGetAll(); + // get value of resource + binary resourceGet(1: string resourceName); + void angularObjectUpdate(1: string name, 2: string noteId, 3: string paragraphId, 4: string object); void angularObjectAdd(1: string name, 2: string noteId, 3: string paragraphId, 4: string object); void angularObjectRemove(1: string name, 2: string noteId, 3: string paragraphId); -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java index 9c2732d..40fd2ed 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java @@ -27,7 +27,7 @@ public class InterpreterContextTest { public void testThreadLocal() { assertNull(InterpreterContext.get()); - InterpreterContext.set(new InterpreterContext(null, null, null, null, null, null, null, null, null)); + InterpreterContext.set(new InterpreterContext(null, null, null, null, null, null, null, null, null, null)); assertNotNull(InterpreterContext.get()); InterpreterContext.remove(); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java index 385b9d6..b6801e4 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java @@ -34,6 +34,8 @@ import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterAngular; +import org.apache.zeppelin.resource.LocalResourcePool; +import org.apache.zeppelin.resource.ResourcePool; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -85,6 +87,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener { new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), new LinkedList<InterpreterContextRunner>(), null); intp.open(); @@ -93,7 +96,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener { @After public void tearDown() throws Exception { intp.close(); - intpGroup.clone(); + intpGroup.close(); intpGroup.destroy(); } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java index f229f6b..7ebe597 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java @@ -71,14 +71,15 @@ public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProce private InterpreterContext createInterpreterContext() { return new InterpreterContext( - "noteId", - "id", - "title", - "text", - new HashMap<String, Object>(), - new GUI(), - new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList<InterpreterContextRunner>(), null); + "noteId", + "id", + "title", + "text", + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + null, + new LinkedList<InterpreterContextRunner>(), null); } @Test http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java index 82ca8d4..4af9ba4 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java @@ -37,6 +37,7 @@ import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA; import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterB; +import org.apache.zeppelin.resource.LocalResourcePool; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.Job.Status; import org.apache.zeppelin.scheduler.Scheduler; @@ -123,6 +124,7 @@ public class RemoteInterpreterTest { new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), new LinkedList<InterpreterContextRunner>(), null)); intpB.open(); @@ -156,6 +158,7 @@ public class RemoteInterpreterTest { new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), new LinkedList<InterpreterContextRunner>(), null)); assertEquals(Code.ERROR, ret.code()); @@ -204,6 +207,7 @@ public class RemoteInterpreterTest { new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), new LinkedList<InterpreterContextRunner>(), null)); assertEquals("500", ret.message()); @@ -216,6 +220,7 @@ public class RemoteInterpreterTest { new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), new LinkedList<InterpreterContextRunner>(), null)); assertEquals("1000", ret.message()); long end = System.currentTimeMillis(); @@ -267,6 +272,7 @@ public class RemoteInterpreterTest { new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), new LinkedList<InterpreterContextRunner>(), null)); } @@ -301,6 +307,7 @@ public class RemoteInterpreterTest { new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), new LinkedList<InterpreterContextRunner>(), null)); } @@ -366,6 +373,7 @@ public class RemoteInterpreterTest { new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), new LinkedList<InterpreterContextRunner>(), null)); synchronized (results) { @@ -443,6 +451,7 @@ public class RemoteInterpreterTest { new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), new LinkedList<InterpreterContextRunner>(), null)); synchronized (results) { @@ -541,6 +550,7 @@ public class RemoteInterpreterTest { new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), new LinkedList<InterpreterContextRunner>(), null)); } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java new file mode 100644 index 0000000..1db68ad --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote.mock; + +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.gson.Gson; +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.AngularObjectWatcher; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.resource.ResourcePool; + +public class MockInterpreterResourcePool extends Interpreter { + static { + Interpreter.register( + "resourcePoolTest", + "resourcePool", + MockInterpreterA.class.getName(), + new InterpreterPropertyBuilder() + .add("p1", "v1", "property1").build()); + + } + + AtomicInteger numWatch = new AtomicInteger(0); + + public MockInterpreterResourcePool(Properties property) { + super(property); + } + + @Override + public void open() { + } + + @Override + public void close() { + + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + String[] stmt = st.split(" "); + String cmd = stmt[0]; + String name = null; + if (stmt.length >= 2) { + name = stmt[1]; + } + String value = null; + if (stmt.length == 3) { + value = stmt[2]; + } + + ResourcePool resourcePool = context.getResourcePool(); + Object ret = null; + if (cmd.equals("put")) { + resourcePool.put(name, value); + } else if (cmd.equalsIgnoreCase("get")) { + ret = resourcePool.get(name).get(); + } else if (cmd.equals("remove")) { + ret = resourcePool.remove(name); + } else if (cmd.equals("getAll")) { + ret = resourcePool.getAll(); + } + + try { + Thread.sleep(500); // wait for watcher executed + } catch (InterruptedException e) { + } + + Gson gson = new Gson(); + return new InterpreterResult(Code.SUCCESS, gson.toJson(ret)); + } + + @Override + public void cancel(InterpreterContext context) { + } + + @Override + public FormType getFormType() { + return FormType.NATIVE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public List<String> completion(String buf, int cursor) { + return null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java new file mode 100644 index 0000000..bedaa02 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.resource; + +import com.google.gson.Gson; +import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterContextRunner; +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller; +import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterResourcePool; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Unittest for DistributedResourcePool + */ +public class DistributedResourcePoolTest { + private InterpreterGroup intpGroup1; + private InterpreterGroup intpGroup2; + private HashMap<String, String> env; + private RemoteInterpreter intp1; + private RemoteInterpreter intp2; + private InterpreterContext context; + private RemoteInterpreterEventPoller eventPoller1; + private RemoteInterpreterEventPoller eventPoller2; + + + @Before + public void setUp() throws Exception { + env = new HashMap<String, String>(); + env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); + + Properties p = new Properties(); + + intp1 = new RemoteInterpreter( + p, + MockInterpreterResourcePool.class.getName(), + new File("../bin/interpreter.sh").getAbsolutePath(), + "fake", + "fakeRepo", + env, + 10 * 1000, + null + ); + + intpGroup1 = new InterpreterGroup("intpGroup1"); + intpGroup1.add(intp1); + intp1.setInterpreterGroup(intpGroup1); + + intp2 = new RemoteInterpreter( + p, + MockInterpreterResourcePool.class.getName(), + new File("../bin/interpreter.sh").getAbsolutePath(), + "fake", + "fakeRepo", + env, + 10 * 1000, + null + ); + + intpGroup2 = new InterpreterGroup("intpGroup2"); + intpGroup2.add(intp2); + intp2.setInterpreterGroup(intpGroup2); + + context = new InterpreterContext( + "note", + "id", + "title", + "text", + new HashMap<String, Object>(), + new GUI(), + null, + null, + new LinkedList<InterpreterContextRunner>(), + null); + + intp1.open(); + intp2.open(); + + eventPoller1 = new RemoteInterpreterEventPoller(null); + eventPoller1.setInterpreterGroup(intpGroup1); + eventPoller1.setInterpreterProcess(intpGroup1.getRemoteInterpreterProcess()); + + eventPoller2 = new RemoteInterpreterEventPoller(null); + eventPoller2.setInterpreterGroup(intpGroup2); + eventPoller2.setInterpreterProcess(intpGroup2.getRemoteInterpreterProcess()); + + eventPoller1.start(); + eventPoller2.start(); + } + + @After + public void tearDown() throws Exception { + eventPoller1.shutdown(); + intp1.close(); + intpGroup1.close(); + intpGroup1.destroy(); + eventPoller2.shutdown(); + intp2.close(); + intpGroup2.close(); + intpGroup2.destroy(); + } + + @Test + public void testRemoteDistributedResourcePool() { + Gson gson = new Gson(); + InterpreterResult ret; + intp1.interpret("put key1 value1", context); + intp2.interpret("put key2 value2", context); + + ret = intp1.interpret("getAll", context); + assertEquals(2, gson.fromJson(ret.message(), ResourceSet.class).size()); + + ret = intp2.interpret("getAll", context); + assertEquals(2, gson.fromJson(ret.message(), ResourceSet.class).size()); + + ret = intp1.interpret("get key1", context); + assertEquals("value1", gson.fromJson(ret.message(), String.class)); + + ret = intp1.interpret("get key2", context); + assertEquals("value2", gson.fromJson(ret.message(), String.class)); + } + + @Test + public void testDistributedResourcePool() { + final LocalResourcePool pool2 = new LocalResourcePool("pool2"); + final LocalResourcePool pool3 = new LocalResourcePool("pool3"); + + DistributedResourcePool pool1 = new DistributedResourcePool("pool1", new ResourcePoolConnector() { + @Override + public ResourceSet getAllResources() { + ResourceSet set = pool2.getAll(); + set.addAll(pool3.getAll()); + + ResourceSet remoteSet = new ResourceSet(); + Gson gson = new Gson(); + for (Resource s : set) { + RemoteResource remoteResource = gson.fromJson(gson.toJson(s), RemoteResource.class); + remoteResource.setResourcePoolConnector(this); + remoteSet.add(remoteResource); + } + return remoteSet; + } + + @Override + public Object readResource(ResourceId id) { + if (id.getResourcePoolId().equals(pool2.id())) { + return pool2.get(id.getName()).get(); + } + if (id.getResourcePoolId().equals(pool3.id())) { + return pool3.get(id.getName()).get(); + } + return null; + } + }); + + assertEquals(0, pool1.getAll().size()); + + + // test get() can get from pool + pool2.put("object1", "value2"); + assertEquals(1, pool1.getAll().size()); + assertTrue(pool1.get("object1").isRemote()); + assertEquals("value2", pool1.get("object1").get()); + + // test get() is locality aware + pool1.put("object1", "value1"); + assertEquals(1, pool2.getAll().size()); + assertEquals("value1", pool1.get("object1").get()); + + // test getAll() is locality aware + assertEquals("value1", pool1.getAll().get(0).get()); + assertEquals("value2", pool1.getAll().get(1).get()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/LocalResourcePoolTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/LocalResourcePoolTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/LocalResourcePoolTest.java new file mode 100644 index 0000000..65d284b --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/LocalResourcePoolTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.resource; + +import org.junit.Test; + +import static org.junit.Assert.*; + +/** + * Unittest for LocalResourcePool + */ +public class LocalResourcePoolTest { + + @Test + public void testGetPutResourcePool() { + + LocalResourcePool pool = new LocalResourcePool("pool1"); + assertEquals("pool1", pool.id()); + + assertNull(pool.get("notExists")); + pool.put("item1", "value1"); + Resource resource = pool.get("item1"); + assertNotNull(resource); + assertEquals(pool.id(), resource.getResourceId().getResourcePoolId()); + assertEquals("value1", resource.get()); + assertTrue(resource.isLocal()); + assertTrue(resource.isSerializable()); + + assertEquals(1, pool.getAll().size()); + + assertNotNull(pool.remove("item1")); + assertNull(pool.remove("item1")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceSetTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceSetTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceSetTest.java new file mode 100644 index 0000000..ca64525 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceSetTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.resource; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Unit test for ResourceSet + */ +public class ResourceSetTest { + + @Test + public void testFilterByName() { + ResourceSet set = new ResourceSet(); + + set.add(new Resource(new ResourceId("poo1", "resource1"), "value1")); + set.add(new Resource(new ResourceId("poo1", "resource2"), new Integer(2))); + assertEquals(2, set.filterByNameRegex(".*").size()); + assertEquals(1, set.filterByNameRegex("resource1").size()); + assertEquals(1, set.filterByNameRegex("resource2").size()); + assertEquals(0, set.filterByNameRegex("res").size()); + assertEquals(2, set.filterByNameRegex("res.*").size()); + } + + @Test + public void testFilterByClassName() { + ResourceSet set = new ResourceSet(); + + set.add(new Resource(new ResourceId("poo1", "resource1"), "value1")); + set.add(new Resource(new ResourceId("poo1", "resource2"), new Integer(2))); + + assertEquals(1, set.filterByClassnameRegex(".*String").size()); + assertEquals(1, set.filterByClassnameRegex(String.class.getName()).size()); + assertEquals(1, set.filterByClassnameRegex(".*Integer").size()); + assertEquals(1, set.filterByClassnameRegex(Integer.class.getName()).size()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceTest.java new file mode 100644 index 0000000..fb8b271 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.resource; + +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import static org.junit.Assert.assertEquals; + +/** + * Test for Resource + */ +public class ResourceTest { + @Test + public void testSerializeDeserialize() throws IOException, ClassNotFoundException { + ByteBuffer buffer = Resource.serializeObject("hello"); + assertEquals("hello", Resource.deserializeObject(buffer)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java index d46b8cf..2bdcd4f 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java @@ -34,13 +34,15 @@ import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA; +import org.apache.zeppelin.resource.LocalResourcePool; import org.apache.zeppelin.scheduler.Job.Status; import org.junit.After; import org.junit.Before; import org.junit.Test; -public class RemoteSchedulerTest { +public class RemoteSchedulerTest implements RemoteInterpreterProcessListener { private SchedulerFactory schedulerSvc; private static final int TICK_WAIT = 100; @@ -71,7 +73,7 @@ public class RemoteSchedulerTest { "fakeRepo", env, 10 * 1000, - null); + this); intpGroup.add(intpA); intpA.setInterpreterGroup(intpGroup); @@ -104,6 +106,7 @@ public class RemoteSchedulerTest { new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), new LinkedList<InterpreterContextRunner>(), null)); return "1000"; } @@ -155,7 +158,7 @@ public class RemoteSchedulerTest { "fakeRepo", env, 10 * 1000, - null); + this); intpGroup.add(intpA); intpA.setInterpreterGroup(intpGroup); @@ -175,6 +178,7 @@ public class RemoteSchedulerTest { new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), new LinkedList<InterpreterContextRunner>(), null); @Override @@ -211,6 +215,7 @@ public class RemoteSchedulerTest { new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), new LinkedList<InterpreterContextRunner>(), null); @Override @@ -270,4 +275,13 @@ public class RemoteSchedulerTest { schedulerSvc.removeScheduler("test"); } + @Override + public void onOutputAppend(String noteId, String paragraphId, String output) { + + } + + @Override + public void onOutputUpdated(String noteId, String paragraphId, String output) { + + } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index 7a87c92..df80cd0 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -493,9 +493,12 @@ public class Notebook { boolean releaseResource = false; try { - releaseResource = (boolean) note.getConfig().get("releaseresource"); - } catch (java.lang.ClassCastException e) { - logger.error(e.toString(), e); + Map<String, Object> config = note.getConfig(); + if (config != null && config.containsKey("releaseresource")) { + releaseResource = (boolean) note.getConfig().get("releaseresource"); + } + } catch (ClassCastException e) { + logger.error(e.getMessage(), e); } if (releaseResource) { for (InterpreterSetting setting : note.getNoteReplLoader().getInterpreterSettings()) { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index 65210f5..bf17c35 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -23,6 +23,7 @@ import org.apache.zeppelin.display.Input; import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.Interpreter.FormType; import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.resource.ResourcePool; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.JobListener; import org.slf4j.Logger; @@ -256,10 +257,12 @@ public class Paragraph extends Job implements Serializable, Cloneable { private InterpreterContext getInterpreterContext() { AngularObjectRegistry registry = null; + ResourcePool resourcePool = null; if (!getNoteReplLoader().getInterpreterSettings().isEmpty()) { InterpreterSetting intpGroup = getNoteReplLoader().getInterpreterSettings().get(0); registry = intpGroup.getInterpreterGroup().getAngularObjectRegistry(); + resourcePool = intpGroup.getInterpreterGroup().getResourcePool(); } List<InterpreterContextRunner> runners = new LinkedList<InterpreterContextRunner>(); @@ -276,6 +279,7 @@ public class Paragraph extends Job implements Serializable, Cloneable { this.getConfig(), this.settings, registry, + resourcePool, runners, new InterpreterOutput(new InterpreterOutputListener() { @Override http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java index 8fea693..d9e965e 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java @@ -62,7 +62,7 @@ public class InterpreterFactoryTest { conf = new ZeppelinConfiguration(); depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + "/local-repo"); factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, depResolver); - context = new InterpreterContext("note", "id", "title", "text", null, null, null, null, null); + context = new InterpreterContext("note", "id", "title", "text", null, null, null, null, null, null); }
