http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepo.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepo.java index 92e7b9e..02e5114 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepo.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepo.java @@ -20,63 +20,62 @@ package org.apache.zeppelin.notebook.repo; import java.io.IOException; import java.util.List; import java.util.Map; + +import org.apache.commons.lang.StringUtils; import org.apache.zeppelin.annotation.ZeppelinApi; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.notebook.Note; import org.apache.zeppelin.notebook.NoteInfo; import org.apache.zeppelin.user.AuthenticationInfo; -/** Notebook repository (persistence layer) abstraction */ +/** + * Notebook repository (persistence layer) abstraction + */ public interface NotebookRepo { void init(ZeppelinConfiguration zConf) throws IOException; /** * Lists notebook information about all notebooks in storage. - * * @param subject contains user information. * @return * @throws IOException */ - @ZeppelinApi - public List<NoteInfo> list(AuthenticationInfo subject) throws IOException; + @ZeppelinApi public List<NoteInfo> list(AuthenticationInfo subject) throws IOException; /** * Get the notebook with the given id. - * * @param noteId is note id. * @param subject contains user information. * @return * @throws IOException */ - @ZeppelinApi - public Note get(String noteId, AuthenticationInfo subject) throws IOException; + @ZeppelinApi public Note get(String noteId, AuthenticationInfo subject) throws IOException; /** * Save given note in storage - * * @param note is the note itself. * @param subject contains user information. * @throws IOException */ - @ZeppelinApi - public void save(Note note, AuthenticationInfo subject) throws IOException; + @ZeppelinApi public void save(Note note, AuthenticationInfo subject) throws IOException; /** * Remove note with given id. - * * @param noteId is the note id. * @param subject contains user information. * @throws IOException */ - @ZeppelinApi - public void remove(String noteId, AuthenticationInfo subject) throws IOException; + @ZeppelinApi public void remove(String noteId, AuthenticationInfo subject) throws IOException; - /** Release any underlying resources */ - @ZeppelinApi - public void close(); + /** + * Release any underlying resources + */ + @ZeppelinApi public void close(); - /** Versioning API (optional, preferred to have). */ + /** + * Versioning API (optional, preferred to have). + */ /** * Get NotebookRepo settings got the given user. @@ -84,8 +83,7 @@ public interface NotebookRepo { * @param subject * @return */ - @ZeppelinApi - public List<NotebookRepoSettingsInfo> getSettings(AuthenticationInfo subject); + @ZeppelinApi public List<NotebookRepoSettingsInfo> getSettings(AuthenticationInfo subject); /** * update notebook repo settings. @@ -93,6 +91,6 @@ public interface NotebookRepo { * @param settings * @param subject */ - @ZeppelinApi - public void updateSettings(Map<String, String> settings, AuthenticationInfo subject); + @ZeppelinApi public void updateSettings(Map<String, String> settings, AuthenticationInfo subject); + }
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSettingsInfo.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSettingsInfo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSettingsInfo.java index 4bbf37f..0525502 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSettingsInfo.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSettingsInfo.java @@ -22,13 +22,15 @@ import java.util.Map; /** * Notebook repo settings. This represent a structure of a notebook repo settings that will mostly * used in the frontend. + * */ public class NotebookRepoSettingsInfo { - /** Type of value, It can be text or list. */ + /** + * Type of value, It can be text or list. + */ public enum Type { - INPUT, - DROPDOWN + INPUT, DROPDOWN } public static NotebookRepoSettingsInfo newInstance() { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java index e2c4657..38665ff 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java @@ -18,8 +18,6 @@ package org.apache.zeppelin.notebook.repo; import com.google.common.collect.Lists; -import java.io.IOException; -import java.util.*; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.notebook.Note; @@ -31,7 +29,14 @@ import org.apache.zeppelin.user.AuthenticationInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Notebook repository sync with remote storage */ +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.*; + +/** + * Notebook repository sync with remote storage + */ public class NotebookRepoSync implements NotebookRepoWithVersionControl { private static final Logger LOG = LoggerFactory.getLogger(NotebookRepoSync.class); private static final int maxRepoNum = 2; @@ -45,7 +50,9 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { private List<NotebookRepo> repos = new ArrayList<>(); private boolean oneWaySync; - /** @param conf */ + /** + * @param conf + */ @SuppressWarnings("static-access") public NotebookRepoSync(ZeppelinConfiguration conf) throws IOException { init(conf); @@ -61,12 +68,8 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { } String[] storageClassNames = allStorageClassNames.split(","); if (storageClassNames.length > getMaxRepoNum()) { - LOG.warn( - "Unsupported number {} of storage classes in ZEPPELIN_NOTEBOOK_STORAGE : {}\n" - + "first {} will be used", - storageClassNames.length, - allStorageClassNames, - getMaxRepoNum()); + LOG.warn("Unsupported number {} of storage classes in ZEPPELIN_NOTEBOOK_STORAGE : {}\n" + + "first {} will be used", storageClassNames.length, allStorageClassNames, getMaxRepoNum()); } for (int i = 0; i < Math.min(storageClassNames.length, getMaxRepoNum()); i++) { @@ -98,35 +101,37 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { NotebookRepoWithSettings repoWithSettings; for (NotebookRepo repo : repos) { - repoWithSettings = - NotebookRepoWithSettings.builder(repo.getClass().getSimpleName()) - .className(repo.getClass().getName()) - .settings(repo.getSettings(subject)) - .build(); + repoWithSettings = NotebookRepoWithSettings + .builder(repo.getClass().getSimpleName()) + .className(repo.getClass().getName()) + .settings(repo.getSettings(subject)) + .build(); reposSetting.add(repoWithSettings); } return reposSetting; } - public NotebookRepoWithSettings updateNotebookRepo( - String name, Map<String, String> settings, AuthenticationInfo subject) { + public NotebookRepoWithSettings updateNotebookRepo(String name, Map<String, String> settings, + AuthenticationInfo subject) { NotebookRepoWithSettings updatedSettings = NotebookRepoWithSettings.EMPTY; for (NotebookRepo repo : repos) { if (repo.getClass().getName().equals(name)) { repo.updateSettings(settings, subject); - updatedSettings = - NotebookRepoWithSettings.builder(repo.getClass().getSimpleName()) - .className(repo.getClass().getName()) - .settings(repo.getSettings(subject)) - .build(); + updatedSettings = NotebookRepoWithSettings + .builder(repo.getClass().getSimpleName()) + .className(repo.getClass().getName()) + .settings(repo.getSettings(subject)) + .build(); break; } } return updatedSettings; } - /** Lists Notebooks from the first repository */ + /** + * Lists Notebooks from the first repository + */ @Override public List<NoteInfo> list(AuthenticationInfo subject) throws IOException { return getRepo(0).list(subject); @@ -137,7 +142,9 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { return getRepo(repoIndex).list(subject); } - /** Returns from Notebook from the first repository */ + /** + * Returns from Notebook from the first repository + */ @Override public Note get(String noteId, AuthenticationInfo subject) throws IOException { return getRepo(0).get(noteId, subject); @@ -148,14 +155,17 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { return getRepo(repoIndex).get(noteId, subject); } - /** Saves to all repositories */ + /** + * Saves to all repositories + */ @Override public void save(Note note, AuthenticationInfo subject) throws IOException { getRepo(0).save(note, subject); if (getRepoCount() > 1) { try { getRepo(1).save(note, subject); - } catch (IOException e) { + } + catch (IOException e) { LOG.info(e.getMessage() + ": Failed to write to secondary storage"); } } @@ -188,12 +198,12 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { NotebookAuthorization auth = NotebookAuthorization.getInstance(); NotebookRepo srcRepo = getRepo(sourceRepoIndex); NotebookRepo dstRepo = getRepo(destRepoIndex); - List<NoteInfo> allSrcNotes = srcRepo.list(subject); - List<NoteInfo> srcNotes = auth.filterByUser(allSrcNotes, subject); - List<NoteInfo> dstNotes = dstRepo.list(subject); + List <NoteInfo> allSrcNotes = srcRepo.list(subject); + List <NoteInfo> srcNotes = auth.filterByUser(allSrcNotes, subject); + List <NoteInfo> dstNotes = dstRepo.list(subject); - Map<String, List<String>> noteIds = - notesCheckDiff(srcNotes, srcRepo, dstNotes, dstRepo, subject); + Map<String, List<String>> noteIds = notesCheckDiff(srcNotes, srcRepo, dstNotes, dstRepo, + subject); List<String> pushNoteIds = noteIds.get(pushKey); List<String> pullNoteIds = noteIds.get(pullKey); List<String> delDstNoteIds = noteIds.get(delDstKey); @@ -235,12 +245,8 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { sync(0, 1, subject); } - private void pushNotes( - AuthenticationInfo subject, - List<String> ids, - NotebookRepo localRepo, - NotebookRepo remoteRepo, - boolean setPermissions) { + private void pushNotes(AuthenticationInfo subject, List<String> ids, NotebookRepo localRepo, + NotebookRepo remoteRepo, boolean setPermissions) { for (String id : ids) { try { remoteRepo.save(localRepo.get(id, subject), subject); @@ -257,7 +263,7 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { NotebookAuthorization notebookAuthorization = NotebookAuthorization.getInstance(); return notebookAuthorization.getOwners(noteId).isEmpty() && notebookAuthorization.getReaders(noteId).isEmpty() - && notebookAuthorization.getRunners(noteId).isEmpty() + && notebookAuthorization.getRunners(noteId).isEmpty() && notebookAuthorization.getWriters(noteId).isEmpty(); } @@ -298,25 +304,18 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { public NotebookRepo getRepo(int repoIndex) throws IOException { if (repoIndex < 0 || repoIndex >= getRepoCount()) { - throw new IOException( - "Requested storage index " - + repoIndex - + " isn't initialized," - + " repository count is " - + getRepoCount()); + throw new IOException("Requested storage index " + repoIndex + + " isn't initialized," + " repository count is " + getRepoCount()); } return repos.get(repoIndex); } - private Map<String, List<String>> notesCheckDiff( - List<NoteInfo> sourceNotes, - NotebookRepo sourceRepo, - List<NoteInfo> destNotes, - NotebookRepo destRepo, + private Map<String, List<String>> notesCheckDiff(List<NoteInfo> sourceNotes, + NotebookRepo sourceRepo, List<NoteInfo> destNotes, NotebookRepo destRepo, AuthenticationInfo subject) { - List<String> pushIDs = new ArrayList<>(); - List<String> pullIDs = new ArrayList<>(); - List<String> delDstIDs = new ArrayList<>(); + List <String> pushIDs = new ArrayList<>(); + List <String> pullIDs = new ArrayList<>(); + List <String> delDstIDs = new ArrayList<>(); NoteInfo dnote; Date sdate, ddate; @@ -375,7 +374,7 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { return map; } - private NoteInfo containsID(List<NoteInfo> notes, String id) { + private NoteInfo containsID(List <NoteInfo> notes, String id) { for (NoteInfo note : notes) { if (note.getId().equals(id)) { return note; @@ -385,7 +384,6 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { } /** * checks latest modification date based on Paragraph fields - * * @return -Date */ private Date lastModificationDate(Note note) { @@ -413,7 +411,7 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { @Override public void close() { LOG.info("Closing all notebook storages"); - for (NotebookRepo repo : repos) { + for (NotebookRepo repo: repos) { repo.close(); } } @@ -433,7 +431,7 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { return false; } - // checkpoint to all available storages + //checkpoint to all available storages @Override public Revision checkpoint(String noteId, String checkpointMsg, AuthenticationInfo subject) throws IOException { @@ -446,24 +444,15 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { for (int i = 0; i < repoBound; i++) { try { if (isRevisionSupportedInRepo(i)) { - allRepoCheckpoints.add( - ((NotebookRepoWithVersionControl) getRepo(i)) + allRepoCheckpoints + .add(((NotebookRepoWithVersionControl) getRepo(i)) .checkpoint(noteId, checkpointMsg, subject)); } } catch (IOException e) { - LOG.warn( - "Couldn't checkpoint in {} storage with index {} for note {}", - getRepo(i).getClass().toString(), - i, - noteId); - errorMessage += - "Error on storage class " - + getRepo(i).getClass().toString() - + " with index " - + i - + " : " - + e.getMessage() - + "\n"; + LOG.warn("Couldn't checkpoint in {} storage with index {} for note {}", + getRepo(i).getClass().toString(), i, noteId); + errorMessage += "Error on storage class " + getRepo(i).getClass().toString() + + " with index " + i + " : " + e.getMessage() + "\n"; errorCount++; } } @@ -511,7 +500,7 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { public List<NotebookRepoSettingsInfo> getSettings(AuthenticationInfo subject) { List<NotebookRepoSettingsInfo> repoSettings = Collections.emptyList(); try { - repoSettings = getRepo(0).getSettings(subject); + repoSettings = getRepo(0).getSettings(subject); } catch (IOException e) { LOG.error("Cannot get notebook repo settings", e); } @@ -536,8 +525,8 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { for (int i = 0; i < repoBound; i++) { try { if (isRevisionSupportedInRepo(i)) { - currentNote = - ((NotebookRepoWithVersionControl) getRepo(i)).setNoteRevision(noteId, revId, subject); + currentNote = ((NotebookRepoWithVersionControl) getRepo(i)) + .setNoteRevision(noteId, revId, subject); } } catch (IOException e) { // already logged @@ -550,4 +539,5 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { } return revisionNote; } + } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoWithSettings.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoWithSettings.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoWithSettings.java index cd58e7c..e5f59da 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoWithSettings.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoWithSettings.java @@ -18,6 +18,7 @@ package org.apache.zeppelin.notebook.repo; import java.util.Collections; import java.util.List; + import org.apache.commons.lang.StringUtils; /** @@ -49,7 +50,9 @@ public class NotebookRepoWithSettings { return this.equals(EMPTY); } - /** Simple builder :). */ + /** + * Simple builder :). + */ public static class Builder { private final String name; private String className = StringUtils.EMPTY; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoWithVersionControl.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoWithVersionControl.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoWithVersionControl.java index e975d52..05c846e 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoWithVersionControl.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoWithVersionControl.java @@ -17,51 +17,53 @@ package org.apache.zeppelin.notebook.repo; -import java.io.IOException; -import java.util.List; import org.apache.commons.lang.StringUtils; import org.apache.zeppelin.annotation.ZeppelinApi; import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.NoteInfo; import org.apache.zeppelin.user.AuthenticationInfo; -/** Notebook repository (persistence layer) abstraction */ +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * Notebook repository (persistence layer) abstraction + */ public interface NotebookRepoWithVersionControl extends NotebookRepo { /** * chekpoint (set revision) for notebook. - * * @param noteId Id of the Notebook * @param checkpointMsg message description of the checkpoint * @return Rev * @throws IOException */ - @ZeppelinApi - public Revision checkpoint(String noteId, String checkpointMsg, AuthenticationInfo subject) - throws IOException; + @ZeppelinApi public Revision checkpoint(String noteId, String checkpointMsg, + AuthenticationInfo subject) throws IOException; /** * Get particular revision of the Notebook. - * + * * @param noteId Id of the Notebook * @param revId revision of the Notebook * @return a Notebook * @throws IOException */ - @ZeppelinApi - public Note get(String noteId, String revId, AuthenticationInfo subject) throws IOException; + @ZeppelinApi public Note get(String noteId, String revId, AuthenticationInfo subject) + throws IOException; /** * List of revisions of the given Notebook. - * + * * @param noteId id of the Notebook * @return list of revisions */ - @ZeppelinApi - public List<Revision> revisionHistory(String noteId, AuthenticationInfo subject); + @ZeppelinApi public List<Revision> revisionHistory(String noteId, AuthenticationInfo subject); /** * Set note to particular revision. - * + * * @param noteId Id of the Notebook * @param revId revision of the Notebook * @return a Notebook @@ -71,14 +73,16 @@ public interface NotebookRepoWithVersionControl extends NotebookRepo { public Note setNoteRevision(String noteId, String revId, AuthenticationInfo subject) throws IOException; - /** Represents the 'Revision' a point in life of the notebook */ + /** + * Represents the 'Revision' a point in life of the notebook + */ static class Revision { public static final Revision EMPTY = new Revision(StringUtils.EMPTY, StringUtils.EMPTY, 0); - + public String id; public String message; public int time; - + public Revision(String revId, String message, int time) { this.id = revId; this.message = message; @@ -89,4 +93,5 @@ public interface NotebookRepoWithVersionControl extends NotebookRepo { return revision == null || EMPTY.equals(revision); } } + } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/Instance.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/Instance.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/Instance.java index b2174b1..2767962 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/Instance.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/Instance.java @@ -16,7 +16,10 @@ */ package org.apache.zeppelin.notebook.repo.zeppelinhub.model; -/** ZeppelinHub Instance structure. */ +/** + * ZeppelinHub Instance structure. + * + */ public class Instance { public int id; public String name; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/UserSessionContainer.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/UserSessionContainer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/UserSessionContainer.java index ee3e8b1..7f035b1 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/UserSessionContainer.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/UserSessionContainer.java @@ -18,13 +18,17 @@ package org.apache.zeppelin.notebook.repo.zeppelinhub.model; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; + import org.apache.commons.lang.StringUtils; -/** Simple and yet dummy container for zeppelinhub session. */ +/** + * Simple and yet dummy container for zeppelinhub session. + * + */ public class UserSessionContainer { private static class Entity { public final String userSession; - + Entity(String userSession) { this.userSession = userSession; } @@ -41,7 +45,7 @@ public class UserSessionContainer { } return entry.userSession; } - + public synchronized String setSession(String principal, String userSession) { Entity entry = new Entity(userSession); sessions.put(principal, entry); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/UserTokenContainer.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/UserTokenContainer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/UserTokenContainer.java index 47e5df7..b594f89 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/UserTokenContainer.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/UserTokenContainer.java @@ -24,12 +24,17 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; + import org.apache.commons.lang.StringUtils; import org.apache.zeppelin.notebook.repo.zeppelinhub.rest.ZeppelinhubRestApiHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** User token manager class. */ +/** + * User token manager class. + * + */ + public class UserTokenContainer { private static final Logger LOG = LoggerFactory.getLogger(UserTokenContainer.class); private static UserTokenContainer instance = null; @@ -37,7 +42,8 @@ public class UserTokenContainer { private final ZeppelinhubRestApiHandler restApiClient; private String defaultToken; - public static UserTokenContainer init(ZeppelinhubRestApiHandler restClient, String defaultToken) { + public static UserTokenContainer init(ZeppelinhubRestApiHandler restClient, + String defaultToken) { if (instance == null) { instance = new UserTokenContainer(restClient, defaultToken); } @@ -52,7 +58,7 @@ public class UserTokenContainer { public static UserTokenContainer getInstance() { return instance; } - + public void setUserToken(String username, String token) { if (StringUtils.isBlank(username) || StringUtils.isBlank(token)) { LOG.warn("Can't set empty user token"); @@ -60,7 +66,7 @@ public class UserTokenContainer { } userTokens.put(username, token); } - + public String getUserToken(String principal) { if (StringUtils.isBlank(principal) || "anonymous".equals(principal)) { if (StringUtils.isBlank(defaultToken)) { @@ -89,7 +95,7 @@ public class UserTokenContainer { } return token; } - + public String getExistingUserToken(String principal) { if (StringUtils.isBlank(principal) || "anonymous".equals(principal)) { return StringUtils.EMPTY; @@ -100,14 +106,15 @@ public class UserTokenContainer { } return token; } - + public String removeUserToken(String username) { return userTokens.remove(username); } - + /** - * Get user default instance. From now, it will be from the first instance from the list, But - * later we can think about marking a default one and return it instead :) + * Get user default instance. + * From now, it will be from the first instance from the list, + * But later we can think about marking a default one and return it instead :) */ public String getDefaultZeppelinInstanceToken(String ticket) throws IOException { List<Instance> instances = getUserInstances(ticket); @@ -116,14 +123,14 @@ public class UserTokenContainer { } String token = instances.get(0).token; - LOG.debug( - "The following instance has been assigned {} with token {}", instances.get(0).name, token); + LOG.debug("The following instance has been assigned {} with token {}", instances.get(0).name, + token); return token; } - + /** - * Get list of user instances from Zeppelinhub. This will avoid and remove the needs of setting up - * token in zeppelin-env.sh. + * Get list of user instances from Zeppelinhub. + * This will avoid and remove the needs of setting up token in zeppelin-env.sh. */ public List<Instance> getUserInstances(String ticket) throws IOException { if (StringUtils.isBlank(ticket)) { @@ -131,11 +138,11 @@ public class UserTokenContainer { } return restApiClient.getInstances(ticket); } - + public List<String> getAllTokens() { return new ArrayList<String>(userTokens.values()); } - + public Map<String, String> getAllUserTokens() { return new HashMap<String, String>(userTokens); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/HttpProxyClient.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/HttpProxyClient.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/HttpProxyClient.java index 5891150..690a8b6 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/HttpProxyClient.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/HttpProxyClient.java @@ -22,7 +22,9 @@ import java.net.URI; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; + import javax.net.ssl.SSLContext; + import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.http.HttpHost; @@ -54,37 +56,40 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * This is http client class for the case of proxy usage jetty-client has issue with https over - * proxy for 9.2.x https://github.com/eclipse/jetty.project/issues/408 - * https://github.com/eclipse/jetty.project/issues/827 + * This is http client class for the case of proxy usage + * jetty-client has issue with https over proxy for 9.2.x + * https://github.com/eclipse/jetty.project/issues/408 + * https://github.com/eclipse/jetty.project/issues/827 + * */ + public class HttpProxyClient { private static final Logger LOG = LoggerFactory.getLogger(HttpProxyClient.class); public static final String ZEPPELIN_TOKEN_HEADER = "X-Zeppelin-Token"; - + private CloseableHttpAsyncClient client; private URI proxyUri; - + public static HttpProxyClient newInstance(URI proxyUri) { return new HttpProxyClient(proxyUri); } - + private HttpProxyClient(URI uri) { this.proxyUri = uri; - + client = getAsyncProxyHttpClient(proxyUri); client.start(); } - + public URI getProxyUri() { return proxyUri; } - + private CloseableHttpAsyncClient getAsyncProxyHttpClient(URI proxyUri) { LOG.info("Creating async proxy http client"); PoolingNHttpClientConnectionManager cm = getAsyncConnectionManager(); HttpHost proxy = new HttpHost(proxyUri.getHost(), proxyUri.getPort()); - + HttpAsyncClientBuilder clientBuilder = HttpAsyncClients.custom(); if (cm != null) { clientBuilder = clientBuilder.setConnectionManager(cm); @@ -96,7 +101,7 @@ public class HttpProxyClient { clientBuilder = setRedirects(clientBuilder); return clientBuilder.build(); } - + private PoolingNHttpClientConnectionManager getAsyncConnectionManager() { ConnectingIOReactor ioReactor = null; PoolingNHttpClientConnectionManager cm = null; @@ -106,11 +111,11 @@ public class HttpProxyClient { SSLContext sslcontext = SSLContexts.createSystemDefault(); X509HostnameVerifier hostnameVerifier = new BrowserCompatHostnameVerifier(); @SuppressWarnings("deprecation") - Registry<SchemeIOSessionStrategy> sessionStrategyRegistry = - RegistryBuilder.<SchemeIOSessionStrategy>create() - .register("http", NoopIOSessionStrategy.INSTANCE) - .register("https", new SSLIOSessionStrategy(sslcontext, hostnameVerifier)) - .build(); + Registry<SchemeIOSessionStrategy> sessionStrategyRegistry = RegistryBuilder + .<SchemeIOSessionStrategy>create() + .register("http", NoopIOSessionStrategy.INSTANCE) + .register("https", new SSLIOSessionStrategy(sslcontext, hostnameVerifier)) + .build(); cm = new PoolingNHttpClientConnectionManager(ioReactor, sessionStrategyRegistry); } catch (IOReactorException e) { @@ -119,44 +124,41 @@ public class HttpProxyClient { } return cm; } - + private HttpAsyncClientBuilder setRedirects(HttpAsyncClientBuilder clientBuilder) { - clientBuilder.setRedirectStrategy( - new DefaultRedirectStrategy() { - /** Redirectable methods. */ - private String[] REDIRECT_METHODS = - new String[] { - HttpGet.METHOD_NAME, - HttpPost.METHOD_NAME, - HttpPut.METHOD_NAME, - HttpDelete.METHOD_NAME, - HttpHead.METHOD_NAME - }; - - @Override - protected boolean isRedirectable(String method) { - for (String m : REDIRECT_METHODS) { - if (m.equalsIgnoreCase(method)) { - return true; - } - } - return false; + clientBuilder.setRedirectStrategy(new DefaultRedirectStrategy() { + /** Redirectable methods. */ + private String[] REDIRECT_METHODS = new String[] { + HttpGet.METHOD_NAME, HttpPost.METHOD_NAME, + HttpPut.METHOD_NAME, HttpDelete.METHOD_NAME, HttpHead.METHOD_NAME + }; + + @Override + protected boolean isRedirectable(String method) { + for (String m : REDIRECT_METHODS) { + if (m.equalsIgnoreCase(method)) { + return true; } - }); + } + return false; + } + }); return clientBuilder; } - - public String sendToZeppelinHub(HttpRequestBase request, boolean withResponse) - throws IOException { - return withResponse ? sendAndGetResponse(request) : sendWithoutResponseBody(request); + + public String sendToZeppelinHub(HttpRequestBase request, + boolean withResponse) throws IOException { + return withResponse ? + sendAndGetResponse(request) : sendWithoutResponseBody(request); } + private String sendWithoutResponseBody(HttpRequestBase request) throws IOException { FutureCallback<HttpResponse> callback = getCallback(request); client.execute(request, callback); return StringUtils.EMPTY; } - + private String sendAndGetResponse(HttpRequestBase request) throws IOException { String data = StringUtils.EMPTY; try { @@ -167,33 +169,30 @@ public class HttpProxyClient { data = IOUtils.toString(responseContent, "UTF-8"); } } else { - LOG.error( - "ZeppelinHub {} {} returned with status {} ", - request.getMethod(), - request.getURI(), - code); + LOG.error("ZeppelinHub {} {} returned with status {} ", request.getMethod(), + request.getURI(), code); throw new IOException("Cannot perform " + request.getMethod() + " request to ZeppelinHub"); } - } catch (InterruptedException - | ExecutionException - | TimeoutException + } catch (InterruptedException | ExecutionException | TimeoutException | NullPointerException e) { throw new IOException(e); } return data; } - + private FutureCallback<HttpResponse> getCallback(final HttpRequestBase request) { return new FutureCallback<HttpResponse>() { public void completed(final HttpResponse response) { request.releaseConnection(); - LOG.info("Note {} completed with {} status", request.getMethod(), response.getStatusLine()); + LOG.info("Note {} completed with {} status", request.getMethod(), + response.getStatusLine()); } public void failed(final Exception ex) { request.releaseConnection(); - LOG.error("Note {} failed with {} message", request.getMethod(), ex.getMessage()); + LOG.error("Note {} failed with {} message", request.getMethod(), + ex.getMessage()); } public void cancelled() { @@ -202,7 +201,7 @@ public class HttpProxyClient { } }; } - + public void stop() { try { client.close(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java index 6d11a15..437386c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java @@ -16,8 +16,6 @@ */ package org.apache.zeppelin.notebook.repo.zeppelinhub.rest; -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; import java.io.IOException; import java.io.InputStream; import java.lang.reflect.Type; @@ -28,6 +26,7 @@ import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; + import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.http.client.methods.HttpDelete; @@ -48,14 +47,20 @@ import org.eclipse.jetty.util.ssl.SslContextFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** REST API handler. */ +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; + +/** + * REST API handler. + * + */ public class ZeppelinhubRestApiHandler { private static final Logger LOG = LoggerFactory.getLogger(ZeppelinhubRestApiHandler.class); public static final String ZEPPELIN_TOKEN_HEADER = "X-Zeppelin-Token"; private static final String USER_SESSION_HEADER = "X-User-Session"; private static final String DEFAULT_API_PATH = "/api/v1/zeppelin"; private static boolean PROXY_ON = false; - // TODO(xxx): possibly switch to jetty-client > 9.3.12 when adopt jvm 1.8 + //TODO(xxx): possibly switch to jetty-client > 9.3.12 when adopt jvm 1.8 private static HttpProxyClient proxyClient; private final HttpClient client; private String zepelinhubUrl; @@ -76,19 +81,15 @@ public class ZeppelinhubRestApiHandler { LOG.error("Cannot initialize ZeppelinHub REST async client", e); } } - + private void readProxyConf() { - // try reading https_proxy - String proxyHostString = - StringUtils.isBlank(System.getenv("https_proxy")) - ? System.getenv("HTTPS_PROXY") - : System.getenv("https_proxy"); + //try reading https_proxy + String proxyHostString = StringUtils.isBlank(System.getenv("https_proxy")) ? + System.getenv("HTTPS_PROXY") : System.getenv("https_proxy"); if (StringUtils.isBlank(proxyHostString)) { - // try http_proxy if no https_proxy - proxyHostString = - StringUtils.isBlank(System.getenv("http_proxy")) - ? System.getenv("HTTP_PROXY") - : System.getenv("http_proxy"); + //try http_proxy if no https_proxy + proxyHostString = StringUtils.isBlank(System.getenv("http_proxy")) ? + System.getenv("HTTP_PROXY") : System.getenv("http_proxy"); } if (!StringUtils.isBlank(proxyHostString)) { @@ -113,13 +114,12 @@ public class ZeppelinhubRestApiHandler { httpClient.setMaxConnectionsPerDestination(100); // Config considerations - // TODO(khalid): consider multi-threaded connection manager case + //TODO(khalid): consider multi-threaded connection manager case return httpClient; } /** * Fetch zeppelin instances for a given user. - * * @param ticket * @return * @throws IOException @@ -164,7 +164,7 @@ public class ZeppelinhubRestApiHandler { return sendToZeppelinHub(HttpMethod.GET, url, StringUtils.EMPTY, token, true); } } - + public String putWithResponseBody(String token, String url, String json) throws IOException { if (StringUtils.isBlank(url) || StringUtils.isBlank(json)) { LOG.error("Empty note, cannot send it to zeppelinHub"); @@ -176,7 +176,7 @@ public class ZeppelinhubRestApiHandler { return sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl + url, json, token, true); } } - + public void put(String token, String jsonNote) throws IOException { if (StringUtils.isBlank(jsonNote)) { LOG.error("Cannot save empty note/string to ZeppelinHub"); @@ -195,16 +195,18 @@ public class ZeppelinhubRestApiHandler { return; } if (PROXY_ON) { - sendToZeppelinHubViaProxy( - new HttpDelete(zepelinhubUrl + argument), StringUtils.EMPTY, token, false); + sendToZeppelinHubViaProxy(new HttpDelete(zepelinhubUrl + argument), StringUtils.EMPTY, token, + false); } else { - sendToZeppelinHub( - HttpMethod.DELETE, zepelinhubUrl + argument, StringUtils.EMPTY, token, false); + sendToZeppelinHub(HttpMethod.DELETE, zepelinhubUrl + argument, StringUtils.EMPTY, token, + false); } } - - private String sendToZeppelinHubViaProxy( - HttpRequestBase request, String json, String token, boolean withResponse) throws IOException { + + private String sendToZeppelinHubViaProxy(HttpRequestBase request, + String json, + String token, + boolean withResponse) throws IOException { request.setHeader(ZEPPELIN_TOKEN_HEADER, token); if (request.getMethod().equals(HttpPost.METHOD_NAME)) { HttpPost post = (HttpPost) request; @@ -222,39 +224,36 @@ public class ZeppelinhubRestApiHandler { } else { LOG.warn("Proxy client request was submitted while not correctly initialized"); } - return body; + return body; } - - private String sendToZeppelinHub( - HttpMethod method, String url, String json, String token, boolean withResponse) + + private String sendToZeppelinHub(HttpMethod method, + String url, + String json, + String token, + boolean withResponse) throws IOException { Request request = client.newRequest(url).method(method).header(ZEPPELIN_TOKEN_HEADER, token); if ((method.equals(HttpMethod.PUT) || method.equals(HttpMethod.POST)) && !StringUtils.isBlank(json)) { request.content(new StringContentProvider(json, "UTF-8"), "application/json;charset=UTF-8"); } - return withResponse - ? sendToZeppelinHub(request) - : sendToZeppelinHubWithoutResponseBody(request); + return withResponse ? + sendToZeppelinHub(request) : sendToZeppelinHubWithoutResponseBody(request); } - + private String sendToZeppelinHubWithoutResponseBody(Request request) throws IOException { - request.send( - new Response.CompleteListener() { - @Override - public void onComplete(Result result) { - Request req = result.getRequest(); - LOG.info( - "ZeppelinHub {} {} returned with status {}: {}", - req.getMethod(), - req.getURI(), - result.getResponse().getStatus(), - result.getResponse().getReason()); - } - }); + request.send(new Response.CompleteListener() { + @Override + public void onComplete(Result result) { + Request req = result.getRequest(); + LOG.info("ZeppelinHub {} {} returned with status {}: {}", req.getMethod(), + req.getURI(), result.getResponse().getStatus(), result.getResponse().getReason()); + } + }); return StringUtils.EMPTY; } - + private String sendToZeppelinHub(final Request request) throws IOException { InputStreamResponseListener listener = new InputStreamResponseListener(); Response response; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/security/Authentication.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/security/Authentication.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/security/Authentication.java index e038345..38d8b50 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/security/Authentication.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/security/Authentication.java @@ -1,7 +1,5 @@ package org.apache.zeppelin.notebook.repo.zeppelinhub.security; -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.security.GeneralSecurityException; @@ -9,23 +7,34 @@ import java.security.Key; import java.security.SecureRandom; import java.util.Collections; import java.util.Map; + import javax.crypto.Cipher; import javax.crypto.KeyGenerator; import javax.crypto.SecretKey; import javax.crypto.spec.IvParameterSpec; import javax.crypto.spec.SecretKeySpec; + import org.apache.commons.codec.binary.Base64; import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.HttpStatus; import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager; import org.apache.commons.httpclient.NameValuePair; +import org.apache.commons.httpclient.methods.GetMethod; import org.apache.commons.httpclient.methods.PostMethod; import org.apache.commons.lang3.StringUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.notebook.socket.Message; +import org.apache.zeppelin.notebook.socket.Message.OP; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Authentication module. */ +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; + +/** + * Authentication module. + * + */ public class Authentication implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(Authentication.class); private String principal = "anonymous"; @@ -66,10 +75,11 @@ public class Authentication implements Runnable { client = new HttpClient(connectionManager); this.token = token; - authEnabled = - !conf.getBoolean("ZEPPELIN_ALLOW_ANONYMOUS", ZEPPELIN_CONF_ANONYMOUS_ALLOWED, true); + authEnabled = !conf.getBoolean("ZEPPELIN_ALLOW_ANONYMOUS", + ZEPPELIN_CONF_ANONYMOUS_ALLOWED, true); - userKey = conf.getString("ZEPPELINHUB_USER_KEY", ZEPPELINHUB_USER_KEY, ""); + userKey = conf.getString("ZEPPELINHUB_USER_KEY", + ZEPPELINHUB_USER_KEY, ""); loginEndpoint = getLoginEndpoint(conf); } @@ -89,9 +99,8 @@ public class Authentication implements Runnable { public boolean isAuthenticated() { return authenticated; } - private String getLoginEndpoint(ZeppelinConfiguration conf) { - int port = conf.getInt("ZEPPELIN_PORT", "zeppelin.server.port", 8080); + int port = conf.getInt("ZEPPELIN_PORT", "zeppelin.server.port" , 8080); if (port <= 0) { port = 8080; } @@ -111,16 +120,15 @@ public class Authentication implements Runnable { if (isEmptyMap(authCredentials)) { return false; } - principal = - authCredentials.containsKey("principal") ? authCredentials.get("principal") : principal; + principal = authCredentials.containsKey("principal") ? authCredentials.get("principal") + : principal; ticket = authCredentials.containsKey("ticket") ? authCredentials.get("ticket") : ticket; roles = authCredentials.containsKey("roles") ? authCredentials.get("roles") : roles; LOG.info("Authenticated into Zeppelin as {} and roles {}", principal, roles); return true; } else { - LOG.warn( - "ZEPPELINHUB_USER_KEY isn't provided. Please provide your credentials" - + "for your instance in ZeppelinHub website and generate your key."); + LOG.warn("ZEPPELINHUB_USER_KEY isn't provided. Please provide your credentials" + + "for your instance in ZeppelinHub website and generate your key."); } } return false; @@ -132,9 +140,9 @@ public class Authentication implements Runnable { LOG.warn("ZEPPELINHUB_USER_KEY is blank"); return StringUtils.EMPTY; } - // use hashed token as a salt + //use hashed token as a salt String hashedToken = Integer.toString(token.hashCode()); - return decrypt(userKey, hashedToken); + return decrypt(userKey, hashedToken); } private String decrypt(String value, String initVector) { @@ -172,8 +180,8 @@ public class Authentication implements Runnable { int code = client.executeMethod(post); if (code == HttpStatus.SC_OK) { String content = post.getResponseBodyAsString(); - Map<String, Object> resp = - gson.fromJson(content, new TypeToken<Map<String, Object>>() {}.getType()); + Map<String, Object> resp = gson.fromJson(content, + new TypeToken<Map<String, Object>>() {}.getType()); LOG.info("Received from Zeppelin LoginRestApi : " + content); return (Map<String, String>) resp.get("body"); } else { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/Client.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/Client.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/Client.java index bbcd7dd..87a1a8f 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/Client.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/Client.java @@ -22,8 +22,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Client to connect Zeppelin and ZeppelinHub via websocket API. Implemented using singleton - * pattern. + * Client to connect Zeppelin and ZeppelinHub via websocket API. + * Implemented using singleton pattern. + * */ public class Client { private static final Logger LOG = LoggerFactory.getLogger(Client.class); @@ -34,8 +35,8 @@ public class Client { private static final int MB = 1048576; private static final int MAXIMUM_NOTE_SIZE = 64 * MB; - public static Client initialize( - String zeppelinUri, String zeppelinhubUri, String token, ZeppelinConfiguration conf) { + public static Client initialize(String zeppelinUri, String zeppelinhubUri, String token, + ZeppelinConfiguration conf) { if (instance == null) { instance = new Client(zeppelinUri, zeppelinhubUri, token, conf); } @@ -46,8 +47,8 @@ public class Client { return instance; } - private Client( - String zeppelinUri, String zeppelinhubUri, String token, ZeppelinConfiguration conf) { + private Client(String zeppelinUri, String zeppelinhubUri, String token, + ZeppelinConfiguration conf) { LOG.debug("Init Client"); zeppelinhubClient = ZeppelinhubClient.initialize(zeppelinhubUri, token); zeppelinClient = ZeppelinClient.initialize(zeppelinUri, token, conf); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java index a338dae..0257b8c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java @@ -27,6 +27,7 @@ import java.util.Timer; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; + import org.apache.commons.lang3.StringUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.notebook.NotebookAuthorization; @@ -47,7 +48,11 @@ import org.eclipse.jetty.websocket.client.WebSocketClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Zeppelin websocket client. */ + +/** + * Zeppelin websocket client. + * + */ public class ZeppelinClient { private static final Logger LOG = LoggerFactory.getLogger(ZeppelinClient.class); private final URI zeppelinWebsocketUrl; @@ -62,24 +67,22 @@ public class ZeppelinClient { private static final int MIN = 60; private static final String ORIGIN = "Origin"; - private static final Set<String> actionable = - new HashSet<String>( - Arrays.asList( - // running events - "ANGULAR_OBJECT_UPDATE", - "PROGRESS", - "NOTE", - "PARAGRAPH", - "PARAGRAPH_UPDATE_OUTPUT", - "PARAGRAPH_APPEND_OUTPUT", - "PARAGRAPH_CLEAR_OUTPUT", - "PARAGRAPH_REMOVE", - // run or stop events - "RUN_PARAGRAPH", - "CANCEL_PARAGRAPH")); + private static final Set<String> actionable = new HashSet<String>(Arrays.asList( + // running events + "ANGULAR_OBJECT_UPDATE", + "PROGRESS", + "NOTE", + "PARAGRAPH", + "PARAGRAPH_UPDATE_OUTPUT", + "PARAGRAPH_APPEND_OUTPUT", + "PARAGRAPH_CLEAR_OUTPUT", + "PARAGRAPH_REMOVE", + // run or stop events + "RUN_PARAGRAPH", + "CANCEL_PARAGRAPH")); - public static ZeppelinClient initialize( - String zeppelinUrl, String token, ZeppelinConfiguration conf) { + public static ZeppelinClient initialize(String zeppelinUrl, String token, + ZeppelinConfiguration conf) { if (instance == null) { instance = new ZeppelinClient(zeppelinUrl, token, conf); } @@ -108,7 +111,7 @@ public class ZeppelinClient { client.setMaxIdleTimeout(5 * MIN * 1000); client.setMaxTextMessageBufferSize(Client.getMaxNoteSize()); client.getPolicy().setMaxTextMessageSize(Client.getMaxNoteSize()); - // TODO(khalid): other client settings + //TODO(khalid): other client settings return client; } @@ -127,28 +130,25 @@ public class ZeppelinClient { private void addRoutines() { schedulerService.add(ZeppelinHeartbeat.newInstance(this), 10, 1 * MIN); - new Timer() - .schedule( - new java.util.TimerTask() { - @Override - public void run() { - int time = 0; - while (time < 5 * MIN) { - watcherSession = openWatcherSession(); - if (watcherSession == null) { - try { - Thread.sleep(5000); - time += 5; - } catch (InterruptedException e) { - // continue - } - } else { - break; - } - } - } - }, - 5000); + new Timer().schedule(new java.util.TimerTask() { + @Override + public void run() { + int time = 0; + while (time < 5 * MIN) { + watcherSession = openWatcherSession(); + if (watcherSession == null) { + try { + Thread.sleep(5000); + time += 5; + } catch (InterruptedException e) { + //continue + } + } else { + break; + } + } + } + }, 5000); } public void stop() { @@ -192,7 +192,7 @@ public class ZeppelinClient { return null; } } - + private Session openWatcherSession() { ClientUpgradeRequest request = new ClientUpgradeRequest(); request.setHeader(WatcherSecurityKey.HTTP_HEADER, WatcherSecurityKey.getKey()); @@ -218,7 +218,7 @@ public class ZeppelinClient { } noteSession.getRemote().sendStringByFuture(serialize(msg)); } - + public Session getZeppelinConnection(String noteId, String principal, String ticket) { if (StringUtils.isBlank(noteId)) { LOG.warn("Cannot get Websocket session with blanck noteId"); @@ -226,8 +226,8 @@ public class ZeppelinClient { } return getNoteSession(noteId, principal, ticket); } - - /* + +/* private Message zeppelinGetNoteMsg(String noteId) { Message getNoteMsg = new Message(Message.OP.GET_NOTE); HashMap<String, Object> data = new HashMap<>(); @@ -247,7 +247,7 @@ public class ZeppelinClient { } return session; } - + private Session openNoteSession(String noteId, String principal, String ticket) { ClientUpgradeRequest request = new ClientUpgradeRequest(); request.setHeader(ORIGIN, "*"); @@ -272,7 +272,7 @@ public class ZeppelinClient { } return session; } - + private boolean isSessionOpen(Session session) { return (session != null) && (session.isOpen()); } @@ -289,7 +289,7 @@ public class ZeppelinClient { public void handleMsgFromZeppelin(String message, String noteId) { Map<String, String> meta = new HashMap<>(); - // TODO(khalid): don't use zeppelinhubToken in this class, decouple + //TODO(khalid): don't use zeppelinhubToken in this class, decouple meta.put("noteId", noteId); Message zeppelinMsg = deserialize(message); if (zeppelinMsg == null) { @@ -299,7 +299,7 @@ public class ZeppelinClient { if (!isActionable(zeppelinMsg.op)) { return; } - + token = UserTokenContainer.getInstance().getUserToken(zeppelinMsg.principal); Client client = Client.getInstance(); if (client == null) { @@ -312,6 +312,7 @@ public class ZeppelinClient { } else { client.relayToZeppelinHub(hubMsg.toJson(), token); } + } private void relayToAllZeppelinHub(ZeppelinhubMessage hubMsg, String noteId) { @@ -323,7 +324,7 @@ public class ZeppelinClient { Client client = Client.getInstance(); Set<String> userAndRoles; String token; - for (String user : userTokens.keySet()) { + for (String user: userTokens.keySet()) { userAndRoles = noteAuth.getRoles(user); userAndRoles.add(user); if (noteAuth.isReader(noteId, userAndRoles)) { @@ -340,7 +341,7 @@ public class ZeppelinClient { } return actionable.contains(action.name()); } - + public void removeNoteConnection(String noteId) { if (StringUtils.isBlank(noteId)) { LOG.error("Cannot remove session for empty noteId"); @@ -355,14 +356,14 @@ public class ZeppelinClient { } LOG.info("Removed note websocket connection for note {}", noteId); } - + private void removeAllConnections() { if (watcherSession != null && watcherSession.isOpen()) { watcherSession.close(); } Session noteSession = null; - for (Map.Entry<String, Session> note : notesConnection.entrySet()) { + for (Map.Entry<String, Session> note: notesConnection.entrySet()) { noteSession = note.getValue(); if (isSessionOpen(noteSession)) { noteSession.close(); @@ -378,8 +379,10 @@ public class ZeppelinClient { } watcherSession.getRemote().sendStringByFuture(serialize(new Message(OP.PING))); } - - /** Only used in test. */ + + /** + * Only used in test. + */ public int countConnectedNotes() { return notesConnection.size(); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinhubClient.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinhubClient.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinhubClient.java index 098aecb..4c03a66 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinhubClient.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinhubClient.java @@ -16,12 +16,7 @@ */ package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket; -import com.amazonaws.util.json.JSONArray; -import com.amazonaws.util.json.JSONException; -import com.amazonaws.util.json.JSONObject; -import com.google.common.collect.Lists; -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; + import java.io.IOException; import java.net.HttpCookie; import java.net.URI; @@ -30,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; + import org.apache.commons.lang.StringUtils; import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener.ZeppelinhubWebsocket; import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinHubOp; @@ -48,7 +44,16 @@ import org.eclipse.jetty.websocket.client.WebSocketClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Manage a zeppelinhub websocket connection. */ +import com.amazonaws.util.json.JSONArray; +import com.amazonaws.util.json.JSONException; +import com.amazonaws.util.json.JSONObject; +import com.google.common.collect.Lists; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; + +/** + * Manage a zeppelinhub websocket connection. + */ public class ZeppelinhubClient { private static final Logger LOG = LoggerFactory.getLogger(ZeppelinhubClient.class); @@ -60,9 +65,9 @@ public class ZeppelinhubClient { private static final long CONNECTION_IDLE_TIME = TimeUnit.SECONDS.toMillis(30); private static ZeppelinhubClient instance = null; private static Gson gson; - + private SchedulerService schedulerService; - private Map<String, ZeppelinhubSession> sessionMap = + private Map<String, ZeppelinhubSession> sessionMap = new ConcurrentHashMap<String, ZeppelinhubSession>(); public static ZeppelinhubClient initialize(String zeppelinhubUrl, String token) { @@ -93,8 +98,10 @@ public class ZeppelinhubClient { LOG.error("Cannot connect to zeppelinhub via websocket", e); } } - - public void initUser(String token) {} + + public void initUser(String token) { + + } public void stop() { LOG.info("Stopping Zeppelinhub websocket client"); @@ -113,7 +120,7 @@ public class ZeppelinhubClient { public String getToken() { return this.zeppelinhubToken; } - + public void send(String msg, String token) { ZeppelinhubSession zeppelinhubSession = getSession(token); if (!isConnectedToZeppelinhub(zeppelinhubSession)) { @@ -126,7 +133,7 @@ public class ZeppelinhubClient { } zeppelinhubSession.sendByFuture(msg); } - + private boolean isConnectedToZeppelinhub(ZeppelinhubSession zeppelinhubSession) { return (zeppelinhubSession != null && zeppelinhubSession.isSessionOpen()); } @@ -173,7 +180,7 @@ public class ZeppelinhubClient { request.setCookies(Lists.newArrayList(new HttpCookie(TOKEN_HEADER, token))); return request; } - + private WebSocketClient createNewWebsocketClient() { SslContextFactory sslContextFactory = new SslContextFactory(); WebSocketClient client = new WebSocketClient(sslContextFactory); @@ -182,7 +189,7 @@ public class ZeppelinhubClient { client.setMaxIdleTimeout(CONNECTION_IDLE_TIME); return client; } - + private void addRoutines() { schedulerService.add(ZeppelinHubHeartbeat.newInstance(this), 10, 23); } @@ -262,9 +269,8 @@ public class ZeppelinhubClient { LOG.warn("Wrong \"paragraph\" format for RUN_NOTEBOOK"); continue; } - zeppelinMsg.data = - gson.fromJson( - paragraphs.getString(i), new TypeToken<Map<String, Object>>() {}.getType()); + zeppelinMsg.data = gson.fromJson(paragraphs.getString(i), + new TypeToken<Map<String, Object>>(){}.getType()); zeppelinMsg.principal = principal; zeppelinMsg.ticket = TicketContainer.instance.getTicket(principal); client.relayToZeppelin(zeppelinMsg, noteId); @@ -276,4 +282,5 @@ public class ZeppelinhubClient { } return true; } + } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/WatcherWebsocket.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/WatcherWebsocket.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/WatcherWebsocket.java index 72b172e..43adf4a 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/WatcherWebsocket.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/WatcherWebsocket.java @@ -27,18 +27,22 @@ import org.eclipse.jetty.websocket.api.WebSocketListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Zeppelin Watcher that will forward user note to ZeppelinHub. */ +/** + * Zeppelin Watcher that will forward user note to ZeppelinHub. + * + */ public class WatcherWebsocket implements WebSocketListener { private static final Logger LOG = LoggerFactory.getLogger(ZeppelinWebsocket.class); private static final String watcherPrincipal = "watcher"; public Session connection; - + public static WatcherWebsocket createInstace() { return new WatcherWebsocket(); } - + @Override - public void onWebSocketBinary(byte[] payload, int offset, int len) {} + public void onWebSocketBinary(byte[] payload, int offset, int len) { + } @Override public void onWebSocketClose(int code, String reason) { @@ -75,4 +79,5 @@ public class WatcherWebsocket implements WebSocketListener { LOG.error("Failed to send message to ZeppelinHub: ", e); } } + } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinWebsocket.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinWebsocket.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinWebsocket.java index fb078ae..fa6ade8 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinWebsocket.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinWebsocket.java @@ -22,7 +22,10 @@ import org.eclipse.jetty.websocket.api.WebSocketListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Zeppelin websocket listener class. */ +/** + * Zeppelin websocket listener class. + * + */ public class ZeppelinWebsocket implements WebSocketListener { private static final Logger LOG = LoggerFactory.getLogger(ZeppelinWebsocket.class); public Session connection; @@ -33,7 +36,9 @@ public class ZeppelinWebsocket implements WebSocketListener { } @Override - public void onWebSocketBinary(byte[] arg0, int arg1, int arg2) {} + public void onWebSocketBinary(byte[] arg0, int arg1, int arg2) { + + } @Override public void onWebSocketClose(int code, String message) { @@ -66,4 +71,5 @@ public class ZeppelinWebsocket implements WebSocketListener { LOG.error("Failed to send message to ZeppelinHub: {}", e.toString()); } } + } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinhubWebsocket.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinhubWebsocket.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinhubWebsocket.java index 713be82..216c307 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinhubWebsocket.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinhubWebsocket.java @@ -23,12 +23,14 @@ import org.eclipse.jetty.websocket.api.WebSocketListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Zeppelinhub websocket handler. */ +/** + * Zeppelinhub websocket handler. + */ public class ZeppelinhubWebsocket implements WebSocketListener { private Logger LOG = LoggerFactory.getLogger(ZeppelinhubWebsocket.class); private Session zeppelinHubSession; private final String token; - + private ZeppelinhubWebsocket(String token) { this.token = token; } @@ -36,7 +38,7 @@ public class ZeppelinhubWebsocket implements WebSocketListener { public static ZeppelinhubWebsocket newInstance(String token) { return new ZeppelinhubWebsocket(token); } - + @Override public void onWebSocketBinary(byte[] payload, int offset, int len) {} @@ -71,7 +73,7 @@ public class ZeppelinhubWebsocket implements WebSocketListener { private boolean isSessionOpen() { return ((zeppelinHubSession != null) && (zeppelinHubSession.isOpen())) ? true : false; } - + private void send(String msg) { if (isSessionOpen()) { zeppelinHubSession.getRemote().sendStringByFuture(msg); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinHubOp.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinHubOp.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinHubOp.java index b5dd1d5..80d5f06 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinHubOp.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinHubOp.java @@ -16,7 +16,9 @@ */ package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol; -/** Zeppelinhub Op. */ +/** + * Zeppelinhub Op. + */ public enum ZeppelinHubOp { LIVE, DEAD, http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinhubMessage.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinhubMessage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinhubMessage.java index de7e7b6..4f7c652 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinhubMessage.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinhubMessage.java @@ -16,10 +16,8 @@ */ package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol; -import com.google.common.collect.Maps; -import com.google.gson.Gson; -import com.google.gson.JsonSyntaxException; import java.util.Map; + import org.apache.commons.lang.StringUtils; import org.apache.zeppelin.common.JsonSerializable; import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.Client; @@ -28,7 +26,14 @@ import org.apache.zeppelin.notebook.socket.Message.OP; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Zeppelinhub message class. */ +import com.google.common.collect.Maps; +import com.google.gson.Gson; +import com.google.gson.JsonSyntaxException; + +/** + * Zeppelinhub message class. + * + */ public class ZeppelinhubMessage implements JsonSerializable { private static final Gson gson = new Gson(); private static final Logger LOG = LoggerFactory.getLogger(Client.class); @@ -37,18 +42,18 @@ public class ZeppelinhubMessage implements JsonSerializable { public Object op; public Object data; public Map<String, String> meta = Maps.newHashMap(); - + private ZeppelinhubMessage() { this.op = OP.LIST_NOTES; this.data = null; } - + private ZeppelinhubMessage(Object op, Object data, Map<String, String> meta) { this.op = op; this.data = data; this.meta = meta; } - + public static ZeppelinhubMessage newMessage(Object op, Object data, Map<String, String> meta) { return new ZeppelinhubMessage(op, data, meta); } @@ -77,4 +82,5 @@ public class ZeppelinhubMessage implements JsonSerializable { } return msg; } + } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/SchedulerService.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/SchedulerService.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/SchedulerService.java index 1958bf0..024a3c0 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/SchedulerService.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/SchedulerService.java @@ -20,7 +20,10 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -/** Creates a thread pool that can schedule zeppelinhub commands. */ +/** + * Creates a thread pool that can schedule zeppelinhub commands. + * + */ public class SchedulerService { private final ScheduledExecutorService pool; @@ -55,4 +58,5 @@ public class SchedulerService { public void close() { pool.shutdown(); } + } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHeartbeat.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHeartbeat.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHeartbeat.java index d1cfed9..11cfa45 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHeartbeat.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHeartbeat.java @@ -20,15 +20,18 @@ import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.ZeppelinClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Routine that sends PING to all connected Zeppelin ws connections. */ +/** + * Routine that sends PING to all connected Zeppelin ws connections. + * + */ public class ZeppelinHeartbeat implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(ZeppelinHubHeartbeat.class); private ZeppelinClient client; - + public static ZeppelinHeartbeat newInstance(ZeppelinClient client) { return new ZeppelinHeartbeat(client); } - + private ZeppelinHeartbeat(ZeppelinClient client) { this.client = client; } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHubHeartbeat.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHubHeartbeat.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHubHeartbeat.java index 9a7e7a8..2282147 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHubHeartbeat.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHubHeartbeat.java @@ -22,24 +22,27 @@ import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.utils.Zeppelinhub import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Routine that send PING event to zeppelinhub. */ +/** + * Routine that send PING event to zeppelinhub. + * + */ public class ZeppelinHubHeartbeat implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(ZeppelinHubHeartbeat.class); private ZeppelinhubClient client; - + public static ZeppelinHubHeartbeat newInstance(ZeppelinhubClient client) { return new ZeppelinHubHeartbeat(client); } - + private ZeppelinHubHeartbeat(ZeppelinhubClient client) { this.client = client; } - + @Override public void run() { LOG.debug("Sending PING to zeppelinhub token"); - for (String token : UserTokenContainer.getInstance().getAllTokens()) { + for (String token: UserTokenContainer.getInstance().getAllTokens()) { client.send(ZeppelinhubUtils.pingMessage(token), token); } - } + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/session/ZeppelinhubSession.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/session/ZeppelinhubSession.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/session/ZeppelinhubSession.java index 9cb7249..86cd4ad 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/session/ZeppelinhubSession.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/session/ZeppelinhubSession.java @@ -21,33 +21,35 @@ import org.eclipse.jetty.websocket.api.Session; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Zeppelinhub session. */ +/** + * Zeppelinhub session. + */ public class ZeppelinhubSession { private static final Logger LOG = LoggerFactory.getLogger(ZeppelinhubSession.class); private Session session; private final String token; - + public static final ZeppelinhubSession EMPTY = new ZeppelinhubSession(null, StringUtils.EMPTY); - + public static ZeppelinhubSession createInstance(Session session, String token) { return new ZeppelinhubSession(session, token); } - + private ZeppelinhubSession(Session session, String token) { this.session = session; this.token = token; } - + public boolean isSessionOpen() { return ((session != null) && (session.isOpen())); } - + public void close() { if (isSessionOpen()) { session.close(); } } - + public void sendByFuture(String msg) { if (StringUtils.isBlank(msg)) { LOG.error("Cannot send event to Zeppelinhub, msg is empty"); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/utils/ZeppelinhubUtils.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/utils/ZeppelinhubUtils.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/utils/ZeppelinhubUtils.java index b81780b..50da343 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/utils/ZeppelinhubUtils.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/utils/ZeppelinhubUtils.java @@ -17,6 +17,7 @@ package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.utils; import java.util.HashMap; + import org.apache.commons.lang.StringUtils; import org.apache.zeppelin.notebook.repo.zeppelinhub.model.UserTokenContainer; import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.ZeppelinhubClient; @@ -26,7 +27,10 @@ import org.apache.zeppelin.notebook.socket.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Helper class. */ +/** + * Helper class. + * + */ public class ZeppelinhubUtils { private static final Logger LOG = LoggerFactory.getLogger(ZeppelinhubUtils.class); @@ -37,10 +41,11 @@ public class ZeppelinhubUtils { } HashMap<String, Object> data = new HashMap<>(); data.put("token", token); - return ZeppelinhubMessage.newMessage(ZeppelinHubOp.LIVE, data, new HashMap<String, String>()) - .toJson(); + return ZeppelinhubMessage + .newMessage(ZeppelinHubOp.LIVE, data, new HashMap<String, String>()) + .toJson(); } - + public static String deadMessage(String token) { if (StringUtils.isBlank(token)) { LOG.error("Cannot create Dead message: token is null or empty"); @@ -48,10 +53,11 @@ public class ZeppelinhubUtils { } HashMap<String, Object> data = new HashMap<>(); data.put("token", token); - return ZeppelinhubMessage.newMessage(ZeppelinHubOp.DEAD, data, new HashMap<String, String>()) - .toJson(); + return ZeppelinhubMessage + .newMessage(ZeppelinHubOp.DEAD, data, new HashMap<String, String>()) + .toJson(); } - + public static String pingMessage(String token) { if (StringUtils.isBlank(token)) { LOG.error("Cannot create Ping message: token is null or empty"); @@ -59,8 +65,9 @@ public class ZeppelinhubUtils { } HashMap<String, Object> data = new HashMap<>(); data.put("token", token); - return ZeppelinhubMessage.newMessage(ZeppelinHubOp.PING, data, new HashMap<String, String>()) - .toJson(); + return ZeppelinhubMessage + .newMessage(ZeppelinHubOp.PING, data, new HashMap<String, String>()) + .toJson(); } public static ZeppelinHubOp toZeppelinHubOp(String text) { @@ -74,7 +81,7 @@ public class ZeppelinhubUtils { } public static boolean isZeppelinHubOp(String text) { - return (toZeppelinHubOp(text) != null); + return (toZeppelinHubOp(text) != null); } public static Message.OP toZeppelinOp(String text) { @@ -88,31 +95,33 @@ public class ZeppelinhubUtils { } public static boolean isZeppelinOp(String text) { - return (toZeppelinOp(text) != null); + return (toZeppelinOp(text) != null); } - + public static void userLoginRoutine(String username) { LOG.debug("Executing user login routine"); String token = UserTokenContainer.getInstance().getUserToken(username); UserTokenContainer.getInstance().setUserToken(username, token); String msg = ZeppelinhubUtils.liveMessage(token); - ZeppelinhubClient.getInstance().send(msg, token); + ZeppelinhubClient.getInstance() + .send(msg, token); } - + public static void userLogoutRoutine(String username) { LOG.debug("Executing user logout routine"); String token = UserTokenContainer.getInstance().removeUserToken(username); String msg = ZeppelinhubUtils.deadMessage(token); - ZeppelinhubClient.getInstance().send(msg, token); + ZeppelinhubClient.getInstance() + .send(msg, token); ZeppelinhubClient.getInstance().removeSession(token); } - - public static void userSwitchTokenRoutine( - String username, String originToken, String targetToken) { + + public static void userSwitchTokenRoutine(String username, String originToken, + String targetToken) { String offMsg = ZeppelinhubUtils.deadMessage(originToken); ZeppelinhubClient.getInstance().send(offMsg, originToken); ZeppelinhubClient.getInstance().removeSession(originToken); - + String onMsg = ZeppelinhubUtils.liveMessage(targetToken); ZeppelinhubClient.getInstance().send(onMsg, targetToken); }
