Repository: incubator-zeppelin Updated Branches: refs/heads/master 29a7f8e74 -> e3f57d1a6
Notebook Storage in S3 Storage the notebook in s3, is necesary set the enviroment variable AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY. In s3 create a folder structure as follows: bucket/youUserName/notebook/ In zeppelin-env.sh set the bucket name and userName... Author: Victor <[email protected]> Closes #154 from vgmartinez/master and squashes the following commits: 926fce7 [Victor] Merge branch 'master' of https://github.com/apache/incubator-zeppelin c284a52 [Victor] Merge branch 'master' of https://github.com/apache/incubator-zeppelin 6b8abcd [Victor] add description to properties 2781621 [Victor] change the property name 518ee79 [Victor] renaming variables aecff8e [Victor] add licensed a6d4dff [Victor] change class storage to VFSNotebookRepo 08b0675 [Victor] fix the version of eirslett 4ccc5c6 [Victor] set variable in zeppelin-env.sh 07ae304 [Victor] s3 notebook storage Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/e3f57d1a Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/e3f57d1a Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/e3f57d1a Branch: refs/heads/master Commit: e3f57d1a685cb67910c114b3ff1f893bf8ebbdb9 Parents: 29a7f8e Author: Victor <[email protected]> Authored: Mon Jul 20 17:00:33 2015 +0200 Committer: Lee moon soo <[email protected]> Committed: Wed Jul 22 14:33:53 2015 +0900 ---------------------------------------------------------------------- conf/zeppelin-env.sh.template | 34 ++-- conf/zeppelin-site.xml.template | 21 +++ zeppelin-server/pom.xml | 4 + zeppelin-zengine/pom.xml | 8 +- .../org/apache/zeppelin/conf/Credentials.java | 43 +++++ .../zeppelin/conf/ZeppelinConfiguration.java | 11 ++ .../zeppelin/notebook/repo/S3NotebookRepo.java | 180 +++++++++++++++++++ 7 files changed, 284 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/e3f57d1a/conf/zeppelin-env.sh.template ---------------------------------------------------------------------- diff --git a/conf/zeppelin-env.sh.template b/conf/zeppelin-env.sh.template index 43d302b..bfaf436 100644 --- a/conf/zeppelin-env.sh.template +++ b/conf/zeppelin-env.sh.template @@ -17,26 +17,28 @@ # # export JAVA_HOME= -# export MASTER= # Spark master url. eg. spark://master_addr:7077. Leave empty if you want to use local mode. -# export ZEPPELIN_JAVA_OPTS # Additional jvm options. for example, export ZEPPELIN_JAVA_OPTS="-Dspark.executor.memory=8g -Dspark.cores.max=16" -# export ZEPPELIN_MEM # Zeppelin jvm mem options Default -Xmx1024m -XX:MaxPermSize=512m -# export ZEPPELIN_INTP_MEM # zeppelin interpreter process jvm mem options. Default = ZEPPELIN_MEM -# export ZEPPELIN_INTP_JAVA_OPTS # zeppelin interpreter process jvm options. Default = ZEPPELIN_JAVA_OPTS +# export MASTER= # Spark master url. eg. spark://master_addr:7077. Leave empty if you want to use local mode. +# export ZEPPELIN_JAVA_OPTS # Additional jvm options. for example, export ZEPPELIN_JAVA_OPTS="-Dspark.executor.memory=8g -Dspark.cores.max=16" +# export ZEPPELIN_MEM # Zeppelin jvm mem options Default -Xmx1024m -XX:MaxPermSize=512m +# export ZEPPELIN_INTP_MEM # zeppelin interpreter process jvm mem options. Default = ZEPPELIN_MEM +# export ZEPPELIN_INTP_JAVA_OPTS # zeppelin interpreter process jvm options. Default = ZEPPELIN_JAVA_OPTS -# export ZEPPELIN_LOG_DIR # Where log files are stored. PWD by default. -# export ZEPPELIN_PID_DIR # The pid files are stored. /tmp by default. -# export ZEPPELIN_NOTEBOOK_DIR # Where notebook saved -# export ZEPPELIN_IDENT_STRING # A string representing this instance of zeppelin. $USER by default. -# export ZEPPELIN_NICENESS # The scheduling priority for daemons. Defaults to 0. +# export ZEPPELIN_LOG_DIR # Where log files are stored. PWD by default. +# export ZEPPELIN_PID_DIR # The pid files are stored. /tmp by default. +# export ZEPPELIN_NOTEBOOK_DIR # Where notebook saved +# export ZEPPELIN_NOTEBOOK_S3_BUCKET # Bucket where notebook saved +# export ZEPPELIN_NOTEBOOK_S3_USER # User in bucket where notebook saved. For example bucket/user/notebook/2A94M5J1Z/note.json +# export ZEPPELIN_IDENT_STRING # A string representing this instance of zeppelin. $USER by default. +# export ZEPPELIN_NICENESS # The scheduling priority for daemons. Defaults to 0. -# export ZEPPELIN_SPARK_USEHIVECONTEXT # Use HiveContext instead of SQLContext if set true. true by default. -# export ZEPPELIN_SPARK_CONCURRENTSQL # Execute multiple SQL concurrently if set true. false by default. -# export ZEPPELIN_SPARK_MAXRESULT # Max number of SparkSQL result to display. 1000 by default. +# export ZEPPELIN_SPARK_USEHIVECONTEXT # Use HiveContext instead of SQLContext if set true. true by default. +# export ZEPPELIN_SPARK_CONCURRENTSQL # Execute multiple SQL concurrently if set true. false by default. +# export ZEPPELIN_SPARK_MAXRESULT # Max number of SparkSQL result to display. 1000 by default. # Options read in YARN client mode -# export HADOOP_CONF_DIR # yarn-site.xml is located in configuration directory in HADOOP_CONF_DIR. +# export HADOOP_CONF_DIR # yarn-site.xml is located in configuration directory in HADOOP_CONF_DIR. # Pyspark (supported with Spark 1.2.1 and above) # To configure pyspark, you need to set spark distribution's path to 'spark.home' property in Interpreter setting screen in Zeppelin GUI -# export PYSPARK_PYTHON # path to the python command. must be the same path on the driver(Zeppelin) and all workers. -# export PYTHONPATH # extra PYTHONPATH. +# export PYSPARK_PYTHON # path to the python command. must be the same path on the driver(Zeppelin) and all workers. +# export PYTHONPATH # extra PYTHONPATH. http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/e3f57d1a/conf/zeppelin-site.xml.template ---------------------------------------------------------------------- diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index 7bd5c3a..c2294cb 100644 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -52,6 +52,27 @@ <description>path or URI for notebook persist</description> </property> +<!-- If used S3 to storage the notebooks, it is necessary the following folder structure bucketname/username/notebook/ --> +<!-- +<property> + <name>zeppelin.notebook.s3.user</name> + <value>user</value> + <description>user name for s3 folder structure</description> +</property> + +<property> + <name>zeppelin.notebook.s3.bucket</name> + <value>zeppelin</value> + <description>bucket name for notebook storage</description> +</property> + +<property> + <name>zeppelin.notebook.storage</name> + <value>org.apache.zeppelin.notebook.repo.S3NotebookRepo</value> + <description>notebook persistence layer implementation</description> +</property> +--> + <property> <name>zeppelin.notebook.storage</name> <value>org.apache.zeppelin.notebook.repo.VFSNotebookRepo</value> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/e3f57d1a/zeppelin-server/pom.xml ---------------------------------------------------------------------- diff --git a/zeppelin-server/pom.xml b/zeppelin-server/pom.xml index 29a6a5b..e85a3ae 100644 --- a/zeppelin-server/pom.xml +++ b/zeppelin-server/pom.xml @@ -151,6 +151,10 @@ <groupId>org.scala-lang</groupId> <artifactId>scalap</artifactId> </exclusion> + <exclusion> + <artifactId>joda-time</artifactId> + <groupId>joda-time</groupId> + </exclusion> </exclusions> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/e3f57d1a/zeppelin-zengine/pom.xml ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml index e6de0b8..76ef04a 100644 --- a/zeppelin-zengine/pom.xml +++ b/zeppelin-zengine/pom.xml @@ -50,7 +50,13 @@ <artifactId>zeppelin-interpreter</artifactId> <version>${project.version}</version> </dependency> - + + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-s3</artifactId> + <version>1.10.1</version> + </dependency> + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/e3f57d1a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/Credentials.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/Credentials.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/Credentials.java new file mode 100644 index 0000000..87248a6 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/Credentials.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.zeppelin.conf; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.BasicAWSCredentials; + +/** + * + * @author vgmartinez + * + */ +public class Credentials { + static String aws_access_key_id = System.getenv("AWS_ACCESS_KEY_ID"); + static String aws_secret_access_key = System.getenv("AWS_SECRET_ACCESS_KEY"); + + private static AWSCredentials credentials = new BasicAWSCredentials(aws_access_key_id, + aws_secret_access_key); + + public AWSCredentials getCredentials() { + return credentials; + } + + public static void setCredentials(AWSCredentials credentials) { + Credentials.credentials = credentials; + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/e3f57d1a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 8f04f9a..d5c8155 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; import org.apache.commons.configuration.tree.ConfigurationNode; +import org.apache.zeppelin.notebook.repo.S3NotebookRepo; import org.apache.zeppelin.notebook.repo.VFSNotebookRepo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -340,6 +341,14 @@ public class ZeppelinConfiguration extends XMLConfiguration { public String getNotebookDir() { return getString(ConfVars.ZEPPELIN_NOTEBOOK_DIR); } + + public String getUser() { + return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_USER); + } + + public String getBucketName() { + return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_BUCKET); + } public String getInterpreterDir() { return getRelativeDir(ConfVars.ZEPPELIN_INTERPRETER_DIR); @@ -411,6 +420,8 @@ public class ZeppelinConfiguration extends XMLConfiguration { ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000), ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"), ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"), + ZEPPELIN_NOTEBOOK_S3_BUCKET("zeppelin.notebook.s3.bucket", "zeppelin"), + ZEPPELIN_NOTEBOOK_S3_USER("zeppelin.notebook.s3.user", "user"), ZEPPELIN_NOTEBOOK_STORAGE("zeppelin.notebook.storage", VFSNotebookRepo.class.getName()), ZEPPELIN_INTERPRETER_REMOTE_RUNNER("zeppelin.interpreter.remoterunner", "bin/interpreter.sh"), // Decide when new note is created, interpreter settings will be binded automatically or not. http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/e3f57d1a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/S3NotebookRepo.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/S3NotebookRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/S3NotebookRepo.java new file mode 100644 index 0000000..0b90262 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/S3NotebookRepo.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.repo; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.util.LinkedList; +import java.util.List; + +import org.apache.commons.io.IOUtils; +import org.apache.zeppelin.conf.Credentials; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.NoteInfo; +import org.apache.zeppelin.notebook.Paragraph; +import org.apache.zeppelin.scheduler.Job.Status; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +/** + * + * @author vgmartinez + * + */ +public class S3NotebookRepo implements NotebookRepo{ + + Logger logger = LoggerFactory.getLogger(S3NotebookRepo.class); + Credentials aws = new Credentials(); + private static String bucketName = ""; + String user = ""; + + AmazonS3 s3client = new AmazonS3Client(aws.getCredentials()); + + private ZeppelinConfiguration conf; + + public S3NotebookRepo(ZeppelinConfiguration conf) throws IOException { + this.conf = conf; + user = conf.getUser(); + bucketName = conf.getBucketName(); + } + + @Override + public List<NoteInfo> list() throws IOException { + List<NoteInfo> infos = new LinkedList<NoteInfo>(); + NoteInfo info = null; + try { + ListObjectsRequest listObjectsRequest = new ListObjectsRequest() + .withBucketName(bucketName) + .withPrefix(user + "/" + "notebook"); + ObjectListing objectListing; + do { + objectListing = s3client.listObjects(listObjectsRequest); + + for (S3ObjectSummary objectSummary : + objectListing.getObjectSummaries()) { + if (objectSummary.getKey().contains("note.json")) { + try { + info = getNoteInfo(objectSummary.getKey()); + if (info != null) { + infos.add(info); + } + } catch (IOException e) { + logger.error("Can't read note ", e); + } + } + } + + listObjectsRequest.setMarker(objectListing.getNextMarker()); + } while (objectListing.isTruncated()); + } catch (AmazonServiceException ase) { + + } catch (AmazonClientException ace) { + logger.info("Caught an AmazonClientException, " + + "which means the client encountered " + + "an internal error while trying to communicate" + + " with S3, " + + "such as not being able to access the network."); + logger.info("Error Message: " + ace.getMessage()); + } + return infos; + } + + private Note getNote(String key) throws IOException { + GsonBuilder gsonBuilder = new GsonBuilder(); + gsonBuilder.setPrettyPrinting(); + Gson gson = gsonBuilder.create(); + + S3Object s3object = s3client.getObject(new GetObjectRequest( + bucketName, key)); + + InputStream ins = s3object.getObjectContent(); + String json = IOUtils.toString(ins, conf.getString(ConfVars.ZEPPELIN_ENCODING)); + ins.close(); + Note note = gson.fromJson(json, Note.class); + + for (Paragraph p : note.getParagraphs()) { + if (p.getStatus() == Status.PENDING || p.getStatus() == Status.RUNNING) { + p.setStatus(Status.ABORT); + } + } + return note; + } + + private NoteInfo getNoteInfo(String key) throws IOException { + Note note = getNote(key); + return new NoteInfo(note); + } + + @Override + public Note get(String noteId) throws IOException { + return getNote(user + "/" + "notebook" + "/" + noteId + "/" + "note.json"); + } + + @Override + public void save(Note note) throws IOException { + GsonBuilder gsonBuilder = new GsonBuilder(); + gsonBuilder.setPrettyPrinting(); + Gson gson = gsonBuilder.create(); + String json = gson.toJson(note); + String key = user + "/" + "notebook" + "/" + note.id() + "/" + "note.json"; + + File file = File.createTempFile("note", "json"); + file.deleteOnExit(); + Writer writer = new OutputStreamWriter(new FileOutputStream(file)); + + writer.write(json); + writer.close(); + s3client.putObject(new PutObjectRequest( + bucketName, key, file)); + } + + @Override + public void remove(String noteId) throws IOException { + + String key = user + "/" + "notebook" + "/" + noteId; + final ListObjectsRequest listObjectsRequest = new ListObjectsRequest() + .withBucketName(bucketName).withPrefix(key); + + ObjectListing objects = s3client.listObjects(listObjectsRequest); + do { + for (S3ObjectSummary objectSummary : objects.getObjectSummaries()) { + s3client.deleteObject(bucketName, objectSummary.getKey()); + } + objects = s3client.listNextBatchOfObjects(objects); + } while (objects.isTruncated()); + } +}
