http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/digest/ServerCallbackHandler.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/digest/ServerCallbackHandler.java b/jstorm-core/src/main/java/backtype/storm/security/auth/digest/ServerCallbackHandler.java index 1788dab..d681236 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/digest/ServerCallbackHandler.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/digest/ServerCallbackHandler.java @@ -48,26 +48,27 @@ public class ServerCallbackHandler implements CallbackHandler { private static final String SYSPROP_SUPER_PASSWORD = "storm.SASLAuthenticationProvider.superPassword"; private String userName; - private final Map<String,String> credentials = new HashMap<String,String>(); + private final Map<String, String> credentials = new HashMap<String, String>(); public ServerCallbackHandler(Configuration configuration) throws IOException { - if (configuration==null) return; + if (configuration == null) + return; AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_SERVER); if (configurationEntries == null) { - String errorMessage = "Could not find a '"+AuthUtils.LOGIN_CONTEXT_SERVER+"' entry in this configuration: Server cannot start."; + String errorMessage = "Could not find a '" + AuthUtils.LOGIN_CONTEXT_SERVER + "' entry in this configuration: Server cannot start."; throw new IOException(errorMessage); } credentials.clear(); - for(AppConfigurationEntry entry: configurationEntries) { - Map<String,?> options = entry.getOptions(); + for (AppConfigurationEntry entry : configurationEntries) { + Map<String, ?> options = entry.getOptions(); // Populate DIGEST-MD5 user -> password map with JAAS configuration entries from the "Server" section. // Usernames are distinguished from other options by prefixing the username with a "user_" prefix. - for(Map.Entry<String, ?> pair : options.entrySet()) { + for (Map.Entry<String, ?> pair : options.entrySet()) { String key = pair.getKey(); if (key.startsWith(USER_PREFIX)) { String userName = key.substring(USER_PREFIX.length()); - credentials.put(userName,(String)pair.getValue()); + credentials.put(userName, (String) pair.getValue()); } } } @@ -98,7 +99,7 @@ public class ServerCallbackHandler implements CallbackHandler { if ("super".equals(this.userName) && System.getProperty(SYSPROP_SUPER_PASSWORD) != null) { // superuser: use Java system property for password, if available. pc.setPassword(System.getProperty(SYSPROP_SUPER_PASSWORD).toCharArray()); - } else if (credentials.containsKey(userName) ) { + } else if (credentials.containsKey(userName)) { pc.setPassword(credentials.get(userName).toCharArray()); } else { LOG.warn("No password found for user: " + userName); @@ -106,7 +107,7 @@ public class ServerCallbackHandler implements CallbackHandler { } private void handleRealmCallback(RealmCallback rc) { - LOG.debug("handleRealmCallback: "+ rc.getDefaultText()); + LOG.debug("handleRealmCallback: " + rc.getDefaultText()); rc.setText(rc.getDefaultText()); } @@ -114,14 +115,14 @@ public class ServerCallbackHandler implements CallbackHandler { String authenticationID = ac.getAuthenticationID(); LOG.info("Successfully authenticated client: authenticationID = " + authenticationID + " authorizationID = " + ac.getAuthorizationID()); - //if authorizationId is not set, set it to authenticationId. - if(ac.getAuthorizationID() == null) { + // if authorizationId is not set, set it to authenticationId. + if (ac.getAuthorizationID() == null) { ac.setAuthorizedID(authenticationID); } - //When authNid and authZid are not equal , authNId is attempting to impersonate authZid, We - //add the authNid as the real user in reqContext's subject which will be used during authorization. - if(!authenticationID.equals(ac.getAuthorizationID())) { + // When authNid and authZid are not equal , authNId is attempting to impersonate authZid, We + // add the authNid as the real user in reqContext's subject which will be used during authorization. + if (!authenticationID.equals(ac.getAuthorizationID())) { LOG.info("Impersonation attempt authenticationID = " + ac.getAuthenticationID() + " authorizationID = " + ac.getAuthorizationID()); ReqContext.context().setRealPrincipal(new SaslTransportPlugin.User(ac.getAuthenticationID())); }
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGT.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGT.java b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGT.java index aed1c4f..116febb 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGT.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGT.java @@ -63,7 +63,7 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer { private static KerberosTicket getTGT(Subject subject) { Set<KerberosTicket> tickets = subject.getPrivateCredentials(KerberosTicket.class); - for(KerberosTicket ticket: tickets) { + for (KerberosTicket ticket : tickets) { KerberosPrincipal server = ticket.getServer(); if (server.getName().equals("krbtgt/" + server.getRealm() + "@" + server.getRealm())) { tickets = null; @@ -72,26 +72,26 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer { } tickets = null; return null; - } + } @Override public void populateCredentials(Map<String, String> credentials) { - //Log the user in and get the TGT + // Log the user in and get the TGT try { Configuration login_conf = AuthUtils.GetConfiguration(conf); ClientCallbackHandler client_callback_handler = new ClientCallbackHandler(login_conf); - - //login our user - Configuration.setConfiguration(login_conf); + + // login our user + Configuration.setConfiguration(login_conf); LoginContext lc = new LoginContext(AuthUtils.LOGIN_CONTEXT_CLIENT, client_callback_handler); try { lc.login(); final Subject subject = lc.getSubject(); KerberosTicket tgt = getTGT(subject); - if (tgt == null) { //error - throw new RuntimeException("Fail to verify user principal with section \"" - +AuthUtils.LOGIN_CONTEXT_CLIENT+"\" in login configuration file "+ login_conf); + if (tgt == null) { // error + throw new RuntimeException("Fail to verify user principal with section \"" + AuthUtils.LOGIN_CONTEXT_CLIENT + + "\" in login configuration file " + login_conf); } if (!tgt.isForwardable()) { @@ -102,7 +102,7 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer { throw new RuntimeException("The TGT found is not renewable"); } - LOG.info("Pushing TGT for "+tgt.getClient()+" to topology."); + LOG.info("Pushing TGT for " + tgt.getClient() + " to topology."); saveTGT(tgt, credentials); } finally { lc.logout(); @@ -131,7 +131,7 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer { try { ByteArrayInputStream bin = new ByteArrayInputStream(DatatypeConverter.parseBase64Binary(credentials.get("TGT"))); ObjectInputStream in = new ObjectInputStream(bin); - ret = (KerberosTicket)in.readObject(); + ret = (KerberosTicket) in.readObject(); in.close(); } catch (Exception e) { throw new RuntimeException(e); @@ -155,16 +155,16 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer { KerberosTicket tgt = getTGT(credentials); if (tgt != null) { Set<Object> creds = subject.getPrivateCredentials(); - synchronized(creds) { + synchronized (creds) { Iterator<Object> iterator = creds.iterator(); while (iterator.hasNext()) { Object o = iterator.next(); if (o instanceof KerberosTicket) { - KerberosTicket t = (KerberosTicket)o; + KerberosTicket t = (KerberosTicket) o; iterator.remove(); try { t.destroy(); - } catch (DestroyFailedException e) { + } catch (DestroyFailedException e) { LOG.warn("Failed to destory ticket ", e); } } @@ -179,8 +179,8 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer { } /** - * Hadoop does not just go off of a TGT, it needs a bit more. This - * should fill in the rest. + * Hadoop does not just go off of a TGT, it needs a bit more. This should fill in the rest. + * * @param subject the subject that should have a TGT in it. */ private void loginHadoopUser(Subject subject) { @@ -193,23 +193,21 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer { } try { Method isSecEnabled = ugi.getMethod("isSecurityEnabled"); - if (!((Boolean)isSecEnabled.invoke(null))) { - LOG.warn("Hadoop is on the classpath but not configured for " + - "security, if you want security you need to be sure that " + - "hadoop.security.authentication=kerberos in core-site.xml " + - "in your jar"); + if (!((Boolean) isSecEnabled.invoke(null))) { + LOG.warn("Hadoop is on the classpath but not configured for " + "security, if you want security you need to be sure that " + + "hadoop.security.authentication=kerberos in core-site.xml " + "in your jar"); return; } - + try { Method login = ugi.getMethod("loginUserFromSubject", Subject.class); login.invoke(null, subject); } catch (NoSuchMethodException me) { - //The version of Hadoop does not have the needed client changes. + // The version of Hadoop does not have the needed client changes. // So don't look now, but do something really ugly to work around it. // This is because we are reaching into the hidden bits of Hadoop security, and it works for now, but may stop at any point in time. - //We are just trying to do the following + // We are just trying to do the following // Configuration conf = new Configuration(); // HadoopKerberosName.setConfiguration(conf); // subject.getPrincipals().add(new User(tgt.getClient().toString(), AuthenticationMethod.KERBEROS, null)); @@ -220,7 +218,7 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer { Constructor confCons = confClass.getConstructor(); Object conf = confCons.newInstance(); Class<?> hknClass = Class.forName("org.apache.hadoop.security.HadoopKerberosName"); - Method hknSetConf = hknClass.getMethod("setConfiguration",confClass); + Method hknSetConf = hknClass.getMethod("setConfiguration", confClass); hknSetConf.invoke(null, conf); Class<?> authMethodClass = Class.forName("org.apache.hadoop.security.UserGroupInformation$AuthenticationMethod"); @@ -236,7 +234,7 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer { Constructor userCons = userClass.getConstructor(String.class, authMethodClass, LoginContext.class); userCons.setAccessible(true); Object user = userCons.newInstance(name, kerbAuthMethod, null); - subject.getPrincipals().add((Principal)user); + subject.getPrincipals().add((Principal) user); } } catch (Exception e) { LOG.warn("Something went wrong while trying to initialize Hadoop through reflection. This version of hadoop may not be compatible.", e); @@ -250,14 +248,14 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer { } @Override - public void renew(Map<String,String> credentials, Map topologyConf) { + public void renew(Map<String, String> credentials, Map topologyConf) { KerberosTicket tgt = getTGT(credentials); if (tgt != null) { long refreshTime = getRefreshTime(tgt); long now = System.currentTimeMillis(); if (now >= refreshTime) { try { - LOG.info("Renewing TGT for "+tgt.getClient()); + LOG.info("Renewing TGT for " + tgt.getClient()); tgt.refresh(); saveTGT(tgt, credentials); } catch (RefreshFailedException e) { @@ -272,10 +270,10 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer { Map conf = new java.util.HashMap(); conf.put("java.security.auth.login.config", args[0]); at.prepare(conf); - Map<String,String> creds = new java.util.HashMap<String,String>(); + Map<String, String> creds = new java.util.HashMap<String, String>(); at.populateCredentials(creds); Subject s = new Subject(); at.populateSubject(s, creds); - LOG.info("Got a Subject "+s); + LOG.info("Got a Subject " + s); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModule.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModule.java b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModule.java index 807abe3..647e240 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModule.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModule.java @@ -29,7 +29,6 @@ import javax.security.auth.kerberos.KerberosTicket; import javax.security.auth.login.LoginException; import javax.security.auth.spi.LoginModule; - /** * Custom LoginModule to enable Auto Login based on cached ticket */ @@ -41,10 +40,7 @@ public class AutoTGTKrb5LoginModule implements LoginModule { protected KerberosTicket kerbTicket = null; - public void initialize(Subject subject, - CallbackHandler callbackHandler, - Map<String, ?> sharedState, - Map<String, ?> options) { + public void initialize(Subject subject, CallbackHandler callbackHandler, Map<String, ?> sharedState, Map<String, ?> options) { this.subject = subject; } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModuleTest.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModuleTest.java b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModuleTest.java index ba34fc9..6188566 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModuleTest.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModuleTest.java @@ -31,7 +31,7 @@ public class AutoTGTKrb5LoginModuleTest extends AutoTGTKrb5LoginModule { public void setKerbTicket(KerberosTicket ticket) { this.kerbTicket = ticket; } - + @Override protected void getKerbTicketFromCache() { // Do nothing. http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/ClientCallbackHandler.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/ClientCallbackHandler.java b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/ClientCallbackHandler.java index d46aa8b..13a2cba 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/ClientCallbackHandler.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/ClientCallbackHandler.java @@ -49,11 +49,11 @@ public class ClientCallbackHandler implements CallbackHandler { * @throws IOException */ public ClientCallbackHandler(Configuration configuration) throws IOException { - if (configuration == null) return; + if (configuration == null) + return; AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_CLIENT); if (configurationEntries == null) { - String errorMessage = "Could not find a '"+AuthUtils.LOGIN_CONTEXT_CLIENT - + "' entry in this configuration: Client cannot start."; + String errorMessage = "Could not find a '" + AuthUtils.LOGIN_CONTEXT_CLIENT + "' entry in this configuration: Client cannot start."; LOG.error(errorMessage); throw new IOException(errorMessage); } @@ -61,7 +61,8 @@ public class ClientCallbackHandler implements CallbackHandler { /** * This method is invoked by SASL for authentication challenges - * @param callbacks a collection of challenge callbacks + * + * @param callbacks a collection of challenge callbacks */ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { for (Callback c : callbacks) { @@ -69,20 +70,18 @@ public class ClientCallbackHandler implements CallbackHandler { LOG.debug("name callback"); } else if (c instanceof PasswordCallback) { LOG.debug("password callback"); - LOG.warn("Could not login: the client is being asked for a password, but the " + - " client code does not currently support obtaining a password from the user." + - " Make sure that the client is configured to use a ticket cache (using" + - " the JAAS configuration setting 'useTicketCache=true)' and restart the client. If" + - " you still get this message after that, the TGT in the ticket cache has expired and must" + - " be manually refreshed. To do so, first determine if you are using a password or a" + - " keytab. If the former, run kinit in a Unix shell in the environment of the user who" + - " is running this client using the command" + - " 'kinit <princ>' (where <princ> is the name of the client's Kerberos principal)." + - " If the latter, do" + - " 'kinit -k -t <keytab> <princ>' (where <princ> is the name of the Kerberos principal, and" + - " <keytab> is the location of the keytab file). After manually refreshing your cache," + - " restart this client. If you continue to see this message after manually refreshing" + - " your cache, ensure that your KDC host's clock is in sync with this host's clock."); + LOG.warn("Could not login: the client is being asked for a password, but the " + + " client code does not currently support obtaining a password from the user." + + " Make sure that the client is configured to use a ticket cache (using" + + " the JAAS configuration setting 'useTicketCache=true)' and restart the client. If" + + " you still get this message after that, the TGT in the ticket cache has expired and must" + + " be manually refreshed. To do so, first determine if you are using a password or a" + + " keytab. If the former, run kinit in a Unix shell in the environment of the user who" + " is running this client using the command" + + " 'kinit <princ>' (where <princ> is the name of the client's Kerberos principal)." + " If the latter, do" + + " 'kinit -k -t <keytab> <princ>' (where <princ> is the name of the Kerberos principal, and" + + " <keytab> is the location of the keytab file). After manually refreshing your cache," + + " restart this client. If you continue to see this message after manually refreshing" + + " your cache, ensure that your KDC host's clock is in sync with this host's clock."); } else if (c instanceof AuthorizeCallback) { LOG.debug("authorization callback"); AuthorizeCallback ac = (AuthorizeCallback) c; @@ -96,7 +95,7 @@ public class ClientCallbackHandler implements CallbackHandler { if (ac.isAuthorized()) { ac.setAuthorizedID(authzid); } - } else { + } else { throw new UnsupportedCallbackException(c); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java index ecb0daf..e257a8a 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java @@ -48,19 +48,19 @@ import backtype.storm.security.auth.AuthUtils; import backtype.storm.security.auth.SaslTransportPlugin; public class KerberosSaslTransportPlugin extends SaslTransportPlugin { - public static final String KERBEROS = "GSSAPI"; + public static final String KERBEROS = "GSSAPI"; private static final Logger LOG = LoggerFactory.getLogger(KerberosSaslTransportPlugin.class); public TTransportFactory getServerTransportFactory() throws IOException { - //create an authentication callback handler + // create an authentication callback handler CallbackHandler server_callback_handler = new ServerCallbackHandler(login_conf, storm_conf); - - //login our principal + + // login our principal Subject subject = null; try { - //specify a configuration object to be used - Configuration.setConfiguration(login_conf); - //now login + // specify a configuration object to be used + Configuration.setConfiguration(login_conf); + // now login Login login = new Login(AuthUtils.LOGIN_CONTEXT_SERVER, server_callback_handler); subject = login.getSubject(); } catch (LoginException ex) { @@ -68,27 +68,27 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin { throw new RuntimeException(ex); } - //check the credential of our principal - if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { - throw new RuntimeException("Fail to verify user principal with section \"" - +AuthUtils.LOGIN_CONTEXT_SERVER+"\" in login configuration file "+ login_conf); + // check the credential of our principal + if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { + throw new RuntimeException("Fail to verify user principal with section \"" + AuthUtils.LOGIN_CONTEXT_SERVER + "\" in login configuration file " + + login_conf); } - String principal = AuthUtils.get(login_conf, AuthUtils.LOGIN_CONTEXT_SERVER, "principal"); - LOG.debug("principal:"+principal); + String principal = AuthUtils.get(login_conf, AuthUtils.LOGIN_CONTEXT_SERVER, "principal"); + LOG.debug("principal:" + principal); KerberosName serviceKerberosName = new KerberosName(principal); String serviceName = serviceKerberosName.getServiceName(); String hostName = serviceKerberosName.getHostName(); - Map<String, String> props = new TreeMap<String,String>(); + Map<String, String> props = new TreeMap<String, String>(); props.put(Sasl.QOP, "auth"); props.put(Sasl.SERVER_AUTH, "false"); - //create a transport factory that will invoke our auth callback for digest + // create a transport factory that will invoke our auth callback for digest TSaslServerTransport.Factory factory = new TSaslServerTransport.Factory(); factory.addServerDefinition(KERBEROS, serviceName, hostName, props, server_callback_handler); - //create a wrap transport factory so that we could apply user credential during connections - TUGIAssumingTransportFactory wrapFactory = new TUGIAssumingTransportFactory(factory, subject); + // create a wrap transport factory so that we could apply user credential during connections + TUGIAssumingTransportFactory wrapFactory = new TUGIAssumingTransportFactory(factory, subject); LOG.info("SASL GSSAPI transport factory will be used"); return wrapFactory; @@ -96,55 +96,47 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin { @Override public TTransport connect(TTransport transport, String serverHost, String asUser) throws TTransportException, IOException { - //create an authentication callback handler + // create an authentication callback handler ClientCallbackHandler client_callback_handler = new ClientCallbackHandler(login_conf); - - //login our user + + // login our user Login login = null; - try { - //specify a configuration object to be used - Configuration.setConfiguration(login_conf); - //now login - login = new Login(AuthUtils.LOGIN_CONTEXT_CLIENT, client_callback_handler); + try { + // specify a configuration object to be used + Configuration.setConfiguration(login_conf); + // now login + login = new Login(AuthUtils.LOGIN_CONTEXT_CLIENT, client_callback_handler); } catch (LoginException ex) { LOG.error("Server failed to login in principal:" + ex, ex); throw new RuntimeException(ex); } final Subject subject = login.getSubject(); - if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { //error - throw new RuntimeException("Fail to verify user principal with section \"" - +AuthUtils.LOGIN_CONTEXT_CLIENT+"\" in login configuration file "+ login_conf); + if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { // error + throw new RuntimeException("Fail to verify user principal with section \"" + AuthUtils.LOGIN_CONTEXT_CLIENT + "\" in login configuration file " + + login_conf); } final String principal = StringUtils.isBlank(asUser) ? getPrincipal(subject) : asUser; String serviceName = AuthUtils.get(login_conf, AuthUtils.LOGIN_CONTEXT_CLIENT, "serviceName"); if (serviceName == null) { - serviceName = AuthUtils.SERVICE; + serviceName = AuthUtils.SERVICE; } - Map<String, String> props = new TreeMap<String,String>(); + Map<String, String> props = new TreeMap<String, String>(); props.put(Sasl.QOP, "auth"); props.put(Sasl.SERVER_AUTH, "false"); LOG.debug("SASL GSSAPI client transport is being established"); - final TTransport sasalTransport = new TSaslClientTransport(KERBEROS, - principal, - serviceName, - serverHost, - props, - null, - transport); - - //open Sasl transport with the login credential + final TTransport sasalTransport = new TSaslClientTransport(KERBEROS, principal, serviceName, serverHost, props, null, transport); + + // open Sasl transport with the login credential try { - Subject.doAs(subject, - new PrivilegedExceptionAction<Void>() { + Subject.doAs(subject, new PrivilegedExceptionAction<Void>() { public Void run() { try { - LOG.debug("do as:"+ principal); + LOG.debug("do as:" + principal); sasalTransport.open(); - } - catch (Exception e) { + } catch (Exception e) { LOG.error("Client failed to open SaslClientTransport to interact with a server during session initiation: " + e, e); } return null; @@ -158,19 +150,18 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin { } private String getPrincipal(Subject subject) { - Set<Principal> principals = (Set<Principal>)subject.getPrincipals(); - if (principals==null || principals.size()<1) { + Set<Principal> principals = (Set<Principal>) subject.getPrincipals(); + if (principals == null || principals.size() < 1) { LOG.info("No principal found in login subject"); return null; } - return ((Principal)(principals.toArray()[0])).getName(); + return ((Principal) (principals.toArray()[0])).getName(); } - /** A TransportFactory that wraps another one, but assumes a specified UGI - * before calling through. - * - * This is used on the server side to assume the server's Principal when accepting - * clients. + /** + * A TransportFactory that wraps another one, but assumes a specified UGI before calling through. + * + * This is used on the server side to assume the server's Principal when accepting clients. */ static class TUGIAssumingTransportFactory extends TTransportFactory { private final Subject subject; @@ -180,21 +171,19 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin { this.wrapped = wrapped; this.subject = subject; - Set<Principal> principals = (Set<Principal>)subject.getPrincipals(); - if (principals.size()>0) - LOG.info("Service principal:"+ ((Principal)(principals.toArray()[0])).getName()); + Set<Principal> principals = (Set<Principal>) subject.getPrincipals(); + if (principals.size() > 0) + LOG.info("Service principal:" + ((Principal) (principals.toArray()[0])).getName()); } @Override public TTransport getTransport(final TTransport trans) { try { - return Subject.doAs(subject, - new PrivilegedExceptionAction<TTransport>() { + return Subject.doAs(subject, new PrivilegedExceptionAction<TTransport>() { public TTransport run() { try { return wrapped.getTransport(trans); - } - catch (Exception e) { + } catch (Exception e) { LOG.error("Storm server failed to open transport to interact with a client during session initiation: " + e, e); return null; } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/ServerCallbackHandler.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/ServerCallbackHandler.java b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/ServerCallbackHandler.java index 7b143f0..0e32e0b 100755 --- a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/ServerCallbackHandler.java +++ b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/ServerCallbackHandler.java @@ -41,11 +41,12 @@ public class ServerCallbackHandler implements CallbackHandler { private String userName; public ServerCallbackHandler(Configuration configuration, Map stormConf) throws IOException { - if (configuration==null) return; + if (configuration == null) + return; AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_SERVER); if (configurationEntries == null) { - String errorMessage = "Could not find a '"+AuthUtils.LOGIN_CONTEXT_SERVER+"' entry in this configuration: Server cannot start."; + String errorMessage = "Could not find a '" + AuthUtils.LOGIN_CONTEXT_SERVER + "' entry in this configuration: Server cannot start."; LOG.error(errorMessage); throw new IOException(errorMessage); } @@ -78,14 +79,14 @@ public class ServerCallbackHandler implements CallbackHandler { String authenticationID = ac.getAuthenticationID(); LOG.info("Successfully authenticated client: authenticationID=" + authenticationID + " authorizationID= " + ac.getAuthorizationID()); - //if authorizationId is not set, set it to authenticationId. - if(ac.getAuthorizationID() == null) { + // if authorizationId is not set, set it to authenticationId. + if (ac.getAuthorizationID() == null) { ac.setAuthorizedID(authenticationID); } - //When authNid and authZid are not equal , authNId is attempting to impersonate authZid, We - //add the authNid as the real user in reqContext's subject which will be used during authorization. - if(!ac.getAuthenticationID().equals(ac.getAuthorizationID())) { + // When authNid and authZid are not equal , authNId is attempting to impersonate authZid, We + // add the authNid as the real user in reqContext's subject which will be used during authorization. + if (!ac.getAuthenticationID().equals(ac.getAuthorizationID())) { ReqContext.context().setRealPrincipal(new SaslTransportPlugin.User(ac.getAuthenticationID())); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/serialization/BlowfishTupleSerializer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/security/serialization/BlowfishTupleSerializer.java b/jstorm-core/src/main/java/backtype/storm/security/serialization/BlowfishTupleSerializer.java index 8e66cdf..437cdbb 100644 --- a/jstorm-core/src/main/java/backtype/storm/security/serialization/BlowfishTupleSerializer.java +++ b/jstorm-core/src/main/java/backtype/storm/security/serialization/BlowfishTupleSerializer.java @@ -40,8 +40,9 @@ import backtype.storm.Config; */ public class BlowfishTupleSerializer extends Serializer<ListDelegate> { /** - * The secret key (if any) for data encryption by blowfish payload serialization factory (BlowfishSerializationFactory). - * You should use in via "storm -c topology.tuple.serializer.blowfish.key=YOURKEY -c topology.tuple.serializer=backtype.storm.security.serialization.BlowfishTupleSerializer jar ...". + * The secret key (if any) for data encryption by blowfish payload serialization factory (BlowfishSerializationFactory). You should use in via + * "storm -c topology.tuple.serializer.blowfish.key=YOURKEY -c topology.tuple.serializer=backtype.storm.security.serialization.BlowfishTupleSerializer jar ..." + * . */ public static String SECRET_KEY = "topology.tuple.serializer.blowfish.key"; private static final Logger LOG = LoggerFactory.getLogger(BlowfishTupleSerializer.class); @@ -50,12 +51,12 @@ public class BlowfishTupleSerializer extends Serializer<ListDelegate> { public BlowfishTupleSerializer(Kryo kryo, Map storm_conf) { String encryption_key = null; try { - encryption_key = (String)storm_conf.get(SECRET_KEY); + encryption_key = (String) storm_conf.get(SECRET_KEY); LOG.debug("Blowfish serializer being constructed ..."); if (encryption_key == null) { throw new RuntimeException("Blowfish encryption key not specified"); } - byte[] bytes = Hex.decodeHex(encryption_key.toCharArray()); + byte[] bytes = Hex.decodeHex(encryption_key.toCharArray()); _serializer = new BlowfishSerializer(new ListDelegateSerializer(), bytes); } catch (org.apache.commons.codec.DecoderException ex) { throw new RuntimeException("Blowfish encryption key invalid", ex); @@ -69,22 +70,23 @@ public class BlowfishTupleSerializer extends Serializer<ListDelegate> { @Override public ListDelegate read(Kryo kryo, Input input, Class<ListDelegate> type) { - return (ListDelegate)_serializer.read(kryo, input, type); + return (ListDelegate) _serializer.read(kryo, input, type); } /** * Produce a blowfish key to be used in "Storm jar" command */ public static void main(String[] args) { - try{ + try { KeyGenerator kgen = KeyGenerator.getInstance("Blowfish"); SecretKey skey = kgen.generateKey(); byte[] raw = skey.getEncoded(); String keyString = new String(Hex.encodeHex(raw)); - System.out.println("storm -c "+SECRET_KEY+"="+keyString+" -c "+Config.TOPOLOGY_TUPLE_SERIALIZER+"="+BlowfishTupleSerializer.class.getName() + " ..." ); + System.out.println("storm -c " + SECRET_KEY + "=" + keyString + " -c " + Config.TOPOLOGY_TUPLE_SERIALIZER + "=" + + BlowfishTupleSerializer.class.getName() + " ..."); } catch (Exception ex) { LOG.error(ex.getMessage()); ex.printStackTrace(); } - } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/DefaultKryoFactory.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/DefaultKryoFactory.java b/jstorm-core/src/main/java/backtype/storm/serialization/DefaultKryoFactory.java index a055eb2..91e629a 100755 --- a/jstorm-core/src/main/java/backtype/storm/serialization/DefaultKryoFactory.java +++ b/jstorm-core/src/main/java/backtype/storm/serialization/DefaultKryoFactory.java @@ -22,30 +22,29 @@ import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; import java.util.Map; - public class DefaultKryoFactory implements IKryoFactory { public static class KryoSerializableDefault extends Kryo { boolean _override = false; - + public void overrideDefault(boolean value) { _override = value; - } - + } + @Override public Serializer getDefaultSerializer(Class type) { - if(_override) { + if (_override) { return new SerializableSerializer(); } else { return super.getDefaultSerializer(type); } - } - } - + } + } + @Override public Kryo getKryo(Map conf) { KryoSerializableDefault k = new KryoSerializableDefault(); - k.setRegistrationRequired(!((Boolean) conf.get(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION))); + k.setRegistrationRequired((Boolean) conf.get(Config.TOPOLOGY_KRYO_REGISTER_REQUIRED)); k.setReferences(false); return k; } @@ -53,12 +52,12 @@ public class DefaultKryoFactory implements IKryoFactory { @Override public void preRegister(Kryo k, Map conf) { } - + public void postRegister(Kryo k, Map conf) { - ((KryoSerializableDefault)k).overrideDefault(true); + ((KryoSerializableDefault) k).overrideDefault((Boolean) conf.get(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION)); } @Override - public void postDecorate(Kryo k, Map conf) { - } + public void postDecorate(Kryo k, Map conf) { + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/DefaultSerializationDelegate.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/DefaultSerializationDelegate.java b/jstorm-core/src/main/java/backtype/storm/serialization/DefaultSerializationDelegate.java index 6d986af..c97470f 100755 --- a/jstorm-core/src/main/java/backtype/storm/serialization/DefaultSerializationDelegate.java +++ b/jstorm-core/src/main/java/backtype/storm/serialization/DefaultSerializationDelegate.java @@ -48,10 +48,10 @@ public class DefaultSerializationDelegate implements SerializationDelegate { ObjectInputStream ois = new ObjectInputStream(bis); Object ret = ois.readObject(); ois.close(); - return (T)ret; - } catch(IOException ioe) { + return (T) ret; + } catch (IOException ioe) { throw new RuntimeException(ioe); - } catch(ClassNotFoundException e) { + } catch (ClassNotFoundException e) { throw new RuntimeException(e); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/GzipBridgeSerializationDelegate.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/GzipBridgeSerializationDelegate.java b/jstorm-core/src/main/java/backtype/storm/serialization/GzipBridgeSerializationDelegate.java index c8377c3..4b7951e 100755 --- a/jstorm-core/src/main/java/backtype/storm/serialization/GzipBridgeSerializationDelegate.java +++ b/jstorm-core/src/main/java/backtype/storm/serialization/GzipBridgeSerializationDelegate.java @@ -22,8 +22,8 @@ import java.util.zip.GZIPInputStream; /** * Always writes gzip out, but tests incoming to see if it's gzipped. If it is, deserializes with gzip. If not, uses - * {@link backtype.storm.serialization.DefaultSerializationDelegate} to deserialize. Any logic needing to be enabled - * via {@link #prepare(java.util.Map)} is passed through to both delegates. + * {@link DefaultSerializationDelegate} to deserialize. Any logic needing to be enabled via {@link #prepare(Map)} is + * passed through to both delegates. */ @Deprecated public class GzipBridgeSerializationDelegate implements SerializationDelegate { @@ -47,7 +47,7 @@ public class GzipBridgeSerializationDelegate implements SerializationDelegate { if (isGzipped(bytes)) { return gzipDelegate.deserialize(bytes, clazz); } else { - return defaultDelegate.deserialize(bytes,clazz); + return defaultDelegate.deserialize(bytes, clazz); } } @@ -59,7 +59,6 @@ public class GzipBridgeSerializationDelegate implements SerializationDelegate { * Looks ahead to see if the GZIP magic constant is heading {@code bytes} */ private boolean isGzipped(byte[] bytes) { - return (bytes.length > 1) && (bytes[0] == GZIP_MAGIC_FIRST_BYTE) - && (bytes[1] == GZIP_MAGIC_SECOND_BYTE); + return (bytes.length > 1) && (bytes[0] == GZIP_MAGIC_FIRST_BYTE) && (bytes[1] == GZIP_MAGIC_SECOND_BYTE); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/GzipBridgeThriftSerializationDelegate.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/GzipBridgeThriftSerializationDelegate.java b/jstorm-core/src/main/java/backtype/storm/serialization/GzipBridgeThriftSerializationDelegate.java index e5e77c3..6d580db 100755 --- a/jstorm-core/src/main/java/backtype/storm/serialization/GzipBridgeThriftSerializationDelegate.java +++ b/jstorm-core/src/main/java/backtype/storm/serialization/GzipBridgeThriftSerializationDelegate.java @@ -22,8 +22,8 @@ import java.util.zip.GZIPInputStream; /** * Always writes gzip out, but tests incoming to see if it's gzipped. If it is, deserializes with gzip. If not, uses - * {@link backtype.storm.serialization.ThriftSerializationDelegate} to deserialize. Any logic needing to be enabled - * via {@link #prepare(java.util.Map)} is passed through to both delegates. + * {@link ThriftSerializationDelegate} to deserialize. Any logic needing to be enabled via {@link #prepare(Map)} is + * passed through to both delegates. */ public class GzipBridgeThriftSerializationDelegate implements SerializationDelegate { @@ -46,7 +46,7 @@ public class GzipBridgeThriftSerializationDelegate implements SerializationDeleg if (isGzipped(bytes)) { return gzipDelegate.deserialize(bytes, clazz); } else { - return defaultDelegate.deserialize(bytes,clazz); + return defaultDelegate.deserialize(bytes, clazz); } } @@ -58,7 +58,6 @@ public class GzipBridgeThriftSerializationDelegate implements SerializationDeleg * Looks ahead to see if the GZIP magic constant is heading {@code bytes} */ private boolean isGzipped(byte[] bytes) { - return (bytes.length > 1) && (bytes[0] == GZIP_MAGIC_FIRST_BYTE) - && (bytes[1] == GZIP_MAGIC_SECOND_BYTE); + return (bytes.length > 1) && (bytes[0] == GZIP_MAGIC_FIRST_BYTE) && (bytes[1] == GZIP_MAGIC_SECOND_BYTE); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/GzipSerializationDelegate.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/GzipSerializationDelegate.java b/jstorm-core/src/main/java/backtype/storm/serialization/GzipSerializationDelegate.java index 3c8ee8b..2b27af0 100755 --- a/jstorm-core/src/main/java/backtype/storm/serialization/GzipSerializationDelegate.java +++ b/jstorm-core/src/main/java/backtype/storm/serialization/GzipSerializationDelegate.java @@ -54,10 +54,10 @@ public class GzipSerializationDelegate implements SerializationDelegate { ObjectInputStream ois = new ObjectInputStream(gis); Object ret = ois.readObject(); ois.close(); - return (T)ret; - } catch(IOException ioe) { + return (T) ret; + } catch (IOException ioe) { throw new RuntimeException(ioe); - } catch(ClassNotFoundException e) { + } catch (ClassNotFoundException e) { throw new RuntimeException(e); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/GzipThriftSerializationDelegate.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/GzipThriftSerializationDelegate.java b/jstorm-core/src/main/java/backtype/storm/serialization/GzipThriftSerializationDelegate.java index 933a125..a76c080 100755 --- a/jstorm-core/src/main/java/backtype/storm/serialization/GzipThriftSerializationDelegate.java +++ b/jstorm-core/src/main/java/backtype/storm/serialization/GzipThriftSerializationDelegate.java @@ -49,7 +49,7 @@ public class GzipThriftSerializationDelegate implements SerializationDelegate { try { TBase instance = (TBase) clazz.newInstance(); new TDeserializer().deserialize(instance, Utils.gunzip(bytes)); - return (T)instance; + return (T) instance; } catch (Exception e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/IKryoDecorator.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/IKryoDecorator.java b/jstorm-core/src/main/java/backtype/storm/serialization/IKryoDecorator.java index b154a36..36e59a5 100755 --- a/jstorm-core/src/main/java/backtype/storm/serialization/IKryoDecorator.java +++ b/jstorm-core/src/main/java/backtype/storm/serialization/IKryoDecorator.java @@ -16,6 +16,7 @@ * limitations under the License. */ package backtype.storm.serialization; + import com.esotericsoftware.kryo.Kryo; public interface IKryoDecorator { http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/IKryoFactory.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/IKryoFactory.java b/jstorm-core/src/main/java/backtype/storm/serialization/IKryoFactory.java index 60a847d..b5c4522 100755 --- a/jstorm-core/src/main/java/backtype/storm/serialization/IKryoFactory.java +++ b/jstorm-core/src/main/java/backtype/storm/serialization/IKryoFactory.java @@ -21,20 +21,18 @@ import com.esotericsoftware.kryo.Kryo; import java.util.Map; /** - * An interface that controls the Kryo instance used by Storm for serialization. - * The lifecycle is: + * An interface that controls the Kryo instance used by Storm for serialization. The lifecycle is: * - * 1. The Kryo instance is constructed using getKryo - * 2. Storm registers the default classes (e.g. arrays, lists, maps, etc.) - * 3. Storm calls preRegister hook - * 4. Storm registers all user-defined registrations through topology.kryo.register - * 5. Storm calls postRegister hook - * 6. Storm calls all user-defined decorators through topology.kryo.decorators - * 7. Storm calls postDecorate hook + * 1. The Kryo instance is constructed using getKryo 2. Storm registers the default classes (e.g. arrays, lists, maps, etc.) 3. Storm calls preRegister hook 4. + * Storm registers all user-defined registrations through topology.kryo.register 5. Storm calls postRegister hook 6. Storm calls all user-defined decorators + * through topology.kryo.decorators 7. Storm calls postDecorate hook */ public interface IKryoFactory { Kryo getKryo(Map conf); + void preRegister(Kryo k, Map conf); + void postRegister(Kryo k, Map conf); + void postDecorate(Kryo k, Map conf); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/ITupleDeserializer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/ITupleDeserializer.java b/jstorm-core/src/main/java/backtype/storm/serialization/ITupleDeserializer.java index 4e68658..641a472 100755 --- a/jstorm-core/src/main/java/backtype/storm/serialization/ITupleDeserializer.java +++ b/jstorm-core/src/main/java/backtype/storm/serialization/ITupleDeserializer.java @@ -21,5 +21,5 @@ import backtype.storm.tuple.Tuple; import java.io.IOException; public interface ITupleDeserializer { - Tuple deserialize(byte[] ser); + Tuple deserialize(byte[] ser); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/ITupleSerializer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/ITupleSerializer.java b/jstorm-core/src/main/java/backtype/storm/serialization/ITupleSerializer.java index 90ad932..68df8bf 100755 --- a/jstorm-core/src/main/java/backtype/storm/serialization/ITupleSerializer.java +++ b/jstorm-core/src/main/java/backtype/storm/serialization/ITupleSerializer.java @@ -19,8 +19,7 @@ package backtype.storm.serialization; import backtype.storm.tuple.Tuple; - public interface ITupleSerializer { byte[] serialize(Tuple tuple); -// long crc32(Tuple tuple); + // long crc32(Tuple tuple); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/KryoTupleDeserializer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/KryoTupleDeserializer.java b/jstorm-core/src/main/java/backtype/storm/serialization/KryoTupleDeserializer.java index 3496e68..bb8bcb4 100644 --- a/jstorm-core/src/main/java/backtype/storm/serialization/KryoTupleDeserializer.java +++ b/jstorm-core/src/main/java/backtype/storm/serialization/KryoTupleDeserializer.java @@ -40,55 +40,65 @@ import java.util.Map; public class KryoTupleDeserializer implements ITupleDeserializer { private static final Logger LOG = LoggerFactory.getLogger(KryoTupleDeserializer.class); - + public static final boolean USE_RAW_PACKET = true; - + GeneralTopologyContext _context; KryoValuesDeserializer _kryo; SerializationFactory.IdDictionary _ids; Input _kryoInput; - + public KryoTupleDeserializer(final Map conf, final GeneralTopologyContext context) { _kryo = new KryoValuesDeserializer(conf); _context = context; _ids = new SerializationFactory.IdDictionary(context.getRawTopology()); _kryoInput = new Input(1); } - + public Tuple deserialize(byte[] ser) { - + _kryoInput.setBuffer(ser); + return deserialize(_kryoInput); + } + + public Tuple deserialize(byte[] ser, int offset, int count) { + _kryoInput.setBuffer(ser, offset, count); + return deserialize(_kryoInput); + } + + public Tuple deserialize(Input input) { int targetTaskId = 0; + long timeStamp = 0l; int taskId = 0; int streamId = 0; String componentName = null; String streamName = null; MessageId id = null; - + try { - - _kryoInput.setBuffer(ser); - - targetTaskId = _kryoInput.readInt(); - taskId = _kryoInput.readInt(true); - streamId = _kryoInput.readInt(true); + targetTaskId = input.readInt(); + timeStamp = input.readLong(); + taskId = input.readInt(true); + streamId = input.readInt(true); componentName = _context.getComponentId(taskId); streamName = _ids.getStreamName(componentName, streamId); - id = MessageId.deserialize(_kryoInput); - List<Object> values = _kryo.deserializeFrom(_kryoInput); + id = MessageId.deserialize(input); + List<Object> values = _kryo.deserializeFrom(input); TupleImplExt tuple = new TupleImplExt(_context, values, taskId, streamName, id); tuple.setTargetTaskId(targetTaskId); + tuple.setCreationTimeStamp(timeStamp); return tuple; } catch (Throwable e) { StringBuilder sb = new StringBuilder(); - + sb.append("Deserialize error:"); sb.append("targetTaskId:").append(targetTaskId); + sb.append(",creationTimeStamp:").append(timeStamp); sb.append(",taskId:").append(taskId); sb.append(",streamId:").append(streamId); sb.append(",componentName:").append(componentName); sb.append(",streamName:").append(streamName); sb.append(",MessageId").append(id); - + LOG.info(sb.toString(), e); throw new RuntimeException(e); } @@ -99,15 +109,14 @@ public class KryoTupleDeserializer implements ITupleDeserializer { int offset = 0; while(offset < ser.length) { - int tupleSize = Utils.readIntFromByteArray(ser, offset); + _kryoInput.setBuffer(ser, offset, offset + 4); + int tupleSize = _kryoInput.readInt(); offset += 4; - ByteBuffer buff = ByteBuffer.allocate(tupleSize); - buff.put(ser, offset, tupleSize); - ret.addToBatch(deserialize(buff.array())); + ret.addToBatch(deserialize(ser, offset, offset + tupleSize)); offset += tupleSize; } - + return ret; } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/KryoTupleSerializer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/KryoTupleSerializer.java b/jstorm-core/src/main/java/backtype/storm/serialization/KryoTupleSerializer.java index 1c53d5d..e49e58b 100644 --- a/jstorm-core/src/main/java/backtype/storm/serialization/KryoTupleSerializer.java +++ b/jstorm-core/src/main/java/backtype/storm/serialization/KryoTupleSerializer.java @@ -33,30 +33,35 @@ public class KryoTupleSerializer implements ITupleSerializer { KryoValuesSerializer _kryo; SerializationFactory.IdDictionary _ids; Output _kryoOut; - + public KryoTupleSerializer(final Map conf, final GeneralTopologyContext context) { _kryo = new KryoValuesSerializer(conf); _kryoOut = new Output(2000, 2000000000); _ids = new SerializationFactory.IdDictionary(context.getRawTopology()); } - + + public byte[] serialize(Tuple tuple) { + _kryoOut.clear(); + serializeTuple(_kryoOut, tuple); + return _kryoOut.toBytes(); + } /** * @@@ in the furture, it will skill serialize 'targetTask' through check some flag - * @see backtype.storm.serialization.ITupleSerializer#serialize(int, backtype.storm.tuple.Tuple) + * @see ITupleSerializer#serialize(int, Tuple) */ - public byte[] serialize(Tuple tuple) { + private void serializeTuple(Output output, Tuple tuple) { try { - - _kryoOut.clear(); if (tuple instanceof TupleExt) { - _kryoOut.writeInt(((TupleExt) tuple).getTargetTaskId()); + output.writeInt(((TupleExt) tuple).getTargetTaskId()); + output.writeLong(((TupleExt) tuple).getCreationTimeStamp()); } - - _kryoOut.writeInt(tuple.getSourceTask(), true); - _kryoOut.writeInt(_ids.getStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId()), true); - tuple.getMessageId().serialize(_kryoOut); - _kryo.serializeInto(tuple.getValues(), _kryoOut); - return _kryoOut.toBytes(); + + output.writeInt(tuple.getSourceTask(), true); + output.writeInt( + _ids.getStreamId(tuple.getSourceComponent(), + tuple.getSourceStreamId()), true); + tuple.getMessageId().serialize(output); + _kryo.serializeInto(tuple.getValues(), output); } catch (IOException e) { throw new RuntimeException(e); } @@ -66,31 +71,28 @@ public class KryoTupleSerializer implements ITupleSerializer { if (batch == null || batch.currBatchSize() == 0) return null; - byte[][] bytes = new byte[batch.currBatchSize()][]; - int i = 0, len = 0; + _kryoOut.clear(); for (Tuple tuple : batch.getTuples()) { - /* byte structure: + /* + * byte structure: * 1st tuple: length + tuple bytes * 2nd tuple: length + tuple bytes * ...... */ - bytes[i] = serialize(tuple); - len += bytes[i].length; - // add length bytes (int) - len += 4; - i++; - } - - byte[] ret = new byte[len]; - int index = 0; - for (i = 0; i < bytes.length; i++) { - Utils.writeIntToByteArray(ret, index, bytes[i].length); - index += 4; - for (int j = 0; j < bytes[i].length; j++) { - ret[index++] = bytes[i][j]; - } + int startPos = _kryoOut.position(); + + // Set initial value of tuple length, which will be updated accordingly after serialization + _kryoOut.writeInt(0); + + serializeTuple(_kryoOut, tuple); + + // Update the tuple length + int endPos = _kryoOut.position(); + _kryoOut.setPosition(startPos); + _kryoOut.writeInt(endPos - startPos - 4); + _kryoOut.setPosition(endPos); } - return ret; + return _kryoOut.toBytes(); } public static byte[] serialize(int targetTask) { http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/KryoValuesDeserializer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/KryoValuesDeserializer.java b/jstorm-core/src/main/java/backtype/storm/serialization/KryoValuesDeserializer.java index 209ae53..45a7376 100755 --- a/jstorm-core/src/main/java/backtype/storm/serialization/KryoValuesDeserializer.java +++ b/jstorm-core/src/main/java/backtype/storm/serialization/KryoValuesDeserializer.java @@ -28,22 +28,22 @@ import java.util.Map; public class KryoValuesDeserializer { Kryo _kryo; Input _kryoInput; - + public KryoValuesDeserializer(Map conf) { _kryo = SerializationFactory.getKryo(conf); _kryoInput = new Input(1); } - + public List<Object> deserializeFrom(Input input) { - ListDelegate delegate = (ListDelegate) _kryo.readObject(input, ListDelegate.class); - return delegate.getDelegate(); + ListDelegate delegate = (ListDelegate) _kryo.readObject(input, ListDelegate.class); + return delegate.getDelegate(); } - + public List<Object> deserialize(byte[] ser) throws IOException { _kryoInput.setBuffer(ser); return deserializeFrom(_kryoInput); } - + public Object deserializeObject(byte[] ser) throws IOException { _kryoInput.setBuffer(ser); return _kryo.readClassAndObject(_kryoInput); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/KryoValuesSerializer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/KryoValuesSerializer.java b/jstorm-core/src/main/java/backtype/storm/serialization/KryoValuesSerializer.java index c4a2f71..d53f1bd 100755 --- a/jstorm-core/src/main/java/backtype/storm/serialization/KryoValuesSerializer.java +++ b/jstorm-core/src/main/java/backtype/storm/serialization/KryoValuesSerializer.java @@ -28,28 +28,28 @@ public class KryoValuesSerializer { Kryo _kryo; ListDelegate _delegate; Output _kryoOut; - + public KryoValuesSerializer(Map conf) { _kryo = SerializationFactory.getKryo(conf); _delegate = new ListDelegate(); _kryoOut = new Output(2000, 2000000000); } - + public void serializeInto(List<Object> values, Output out) throws IOException { // this ensures that list of values is always written the same way, regardless - // of whether it's a java collection or one of clojure's persistent collections + // of whether it's a java collection or one of clojure's persistent collections // (which have different serializers) // Doing this lets us deserialize as ArrayList and avoid writing the class here _delegate.setDelegate(values); - _kryo.writeObject(out, _delegate); + _kryo.writeObject(out, _delegate); } - + public byte[] serialize(List<Object> values) throws IOException { _kryoOut.clear(); serializeInto(values, _kryoOut); return _kryoOut.toBytes(); } - + public byte[] serializeObject(Object obj) { _kryoOut.clear(); _kryo.writeClassAndObject(_kryoOut, obj); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/SerializableSerializer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/SerializableSerializer.java b/jstorm-core/src/main/java/backtype/storm/serialization/SerializableSerializer.java index 376ad2a..b60e8b8 100755 --- a/jstorm-core/src/main/java/backtype/storm/serialization/SerializableSerializer.java +++ b/jstorm-core/src/main/java/backtype/storm/serialization/SerializableSerializer.java @@ -30,7 +30,7 @@ import java.io.ObjectOutputStream; import org.apache.commons.io.input.ClassLoaderObjectInputStream; public class SerializableSerializer extends Serializer<Object> { - + @Override public void write(Kryo kryo, Output output, Object object) { ByteArrayOutputStream bos = new ByteArrayOutputStream(); @@ -45,7 +45,7 @@ public class SerializableSerializer extends Serializer<Object> { output.writeInt(ser.length); output.writeBytes(ser); } - + @Override public Object read(Kryo kryo, Input input, Class c) { int len = input.readInt(); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/SerializationFactory.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/SerializationFactory.java b/jstorm-core/src/main/java/backtype/storm/serialization/SerializationFactory.java index ef859be..ebf6158 100755 --- a/jstorm-core/src/main/java/backtype/storm/serialization/SerializationFactory.java +++ b/jstorm-core/src/main/java/backtype/storm/serialization/SerializationFactory.java @@ -21,7 +21,6 @@ import backtype.storm.Config; import backtype.storm.generated.ComponentCommon; import backtype.storm.generated.StormTopology; import backtype.storm.serialization.types.ArrayListSerializer; -import backtype.storm.serialization.types.ListDelegateSerializer; import backtype.storm.serialization.types.HashMapSerializer; import backtype.storm.serialization.types.HashSetSerializer; import backtype.storm.transactional.TransactionAttempt; @@ -33,27 +32,22 @@ import carbonite.JavaBridge; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.serializers.DefaultSerializers.BigIntegerSerializer; -import java.math.BigInteger; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.math.BigInteger; +import java.util.*; + public class SerializationFactory { public static final Logger LOG = LoggerFactory.getLogger(SerializationFactory.class); - + public static Kryo getKryo(Map conf) { IKryoFactory kryoFactory = (IKryoFactory) Utils.newInstance((String) conf.get(Config.TOPOLOGY_KRYO_FACTORY)); Kryo k = kryoFactory.getKryo(conf); if (WorkerClassLoader.getInstance() != null) k.setClassLoader(WorkerClassLoader.getInstance()); k.register(byte[].class); - + /* tuple payload serializer is specified via configuration */ String payloadSerializerName = (String) conf.get(Config.TOPOLOGY_TUPLE_SERIALIZER); try { @@ -63,7 +57,7 @@ public class SerializationFactory { } catch (ClassNotFoundException ex) { throw new RuntimeException(ex); } - + k.register(ArrayList.class, new ArrayListSerializer()); k.register(HashMap.class, new HashMapSerializer()); k.register(HashSet.class, new HashSetSerializer()); @@ -78,17 +72,17 @@ public class SerializationFactory { } catch (Exception e) { throw new RuntimeException(e); } - + Map<String, String> registrations = normalizeKryoRegister(conf); - + kryoFactory.preRegister(k, conf); - + boolean skipMissing = (Boolean) conf.get(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS); for (String klassName : registrations.keySet()) { String serializerClassName = registrations.get(klassName); try { Class klass = Class.forName(klassName, true, k.getClassLoader()); - + Class serializerClass = null; if (serializerClassName != null) serializerClass = Class.forName(serializerClassName, true, k.getClassLoader()); @@ -105,9 +99,9 @@ public class SerializationFactory { } } } - + kryoFactory.postRegister(k, conf); - + if (conf.get(Config.TOPOLOGY_KRYO_DECORATORS) != null) { for (String klassName : (List<String>) conf.get(Config.TOPOLOGY_KRYO_DECORATORS)) { try { @@ -127,21 +121,21 @@ public class SerializationFactory { } } } - + kryoFactory.postDecorate(k, conf); - + return k; } - + public static class IdDictionary { Map<String, Map<String, Integer>> streamNametoId = new HashMap<String, Map<String, Integer>>(); Map<String, Map<Integer, String>> streamIdToName = new HashMap<String, Map<Integer, String>>(); - + public IdDictionary(StormTopology topology) { List<String> componentNames = new ArrayList<String>(topology.get_spouts().keySet()); componentNames.addAll(topology.get_bolts().keySet()); componentNames.addAll(topology.get_state_spouts().keySet()); - + for (String name : componentNames) { ComponentCommon common = Utils.getComponentCommon(topology, name); List<String> streams = new ArrayList<String>(common.get_streams().keySet()); @@ -149,15 +143,15 @@ public class SerializationFactory { streamIdToName.put(name, Utils.reverseMap(streamNametoId.get(name))); } } - + public int getStreamId(String component, String stream) { return streamNametoId.get(component).get(stream); } - + public String getStreamName(String component, int stream) { return streamIdToName.get(component).get(stream); } - + private static Map<String, Integer> idify(List<String> names) { Collections.sort(names); Map<String, Integer> ret = new HashMap<String, Integer>(); @@ -169,7 +163,7 @@ public class SerializationFactory { return ret; } } - + private static Serializer resolveSerializerInstance(Kryo k, Class superClass, Class<? extends Serializer> serializerClass, Map conf) { try { try { @@ -201,7 +195,7 @@ public class SerializationFactory { throw new IllegalArgumentException("Unable to create serializer \"" + serializerClass.getName() + "\" for class: " + superClass.getName(), ex); } } - + private static Map<String, String> normalizeKryoRegister(Map conf) { // TODO: de-duplicate this logic with the code in nimbus Object res = conf.get(Config.TOPOLOGY_KRYO_REGISTER); @@ -219,7 +213,7 @@ public class SerializationFactory { } } } - + // ensure always same order for registrations with TreeMap return new TreeMap<String, String>(ret); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/ThriftSerializationDelegate.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/ThriftSerializationDelegate.java b/jstorm-core/src/main/java/backtype/storm/serialization/ThriftSerializationDelegate.java index f5d03e4..ba37614 100755 --- a/jstorm-core/src/main/java/backtype/storm/serialization/ThriftSerializationDelegate.java +++ b/jstorm-core/src/main/java/backtype/storm/serialization/ThriftSerializationDelegate.java @@ -33,7 +33,7 @@ public class ThriftSerializationDelegate implements SerializationDelegate { @Override public byte[] serialize(Object object) { try { - return new TSerializer().serialize((TBase) object); + return new TSerializer().serialize((TBase) object); } catch (TException e) { throw new RuntimeException(e); } @@ -44,7 +44,7 @@ public class ThriftSerializationDelegate implements SerializationDelegate { try { TBase instance = (TBase) clazz.newInstance(); new TDeserializer().deserialize(instance, bytes); - return (T)instance; + return (T) instance; } catch (Exception e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/types/ArrayListSerializer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/types/ArrayListSerializer.java b/jstorm-core/src/main/java/backtype/storm/serialization/types/ArrayListSerializer.java index 6b7e308..a4bac2f 100755 --- a/jstorm-core/src/main/java/backtype/storm/serialization/types/ArrayListSerializer.java +++ b/jstorm-core/src/main/java/backtype/storm/serialization/types/ArrayListSerializer.java @@ -23,10 +23,9 @@ import com.esotericsoftware.kryo.serializers.CollectionSerializer; import java.util.ArrayList; import java.util.Collection; - public class ArrayListSerializer extends CollectionSerializer { @Override public Collection create(Kryo kryo, Input input, Class<Collection> type) { return new ArrayList(); - } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/types/HashMapSerializer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/types/HashMapSerializer.java b/jstorm-core/src/main/java/backtype/storm/serialization/types/HashMapSerializer.java index 662211b..00af80d 100755 --- a/jstorm-core/src/main/java/backtype/storm/serialization/types/HashMapSerializer.java +++ b/jstorm-core/src/main/java/backtype/storm/serialization/types/HashMapSerializer.java @@ -23,7 +23,6 @@ import com.esotericsoftware.kryo.serializers.MapSerializer; import java.util.HashMap; import java.util.Map; - public class HashMapSerializer extends MapSerializer { @Override public Map create(Kryo kryo, Input input, Class<Map> type) { http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/types/HashSetSerializer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/types/HashSetSerializer.java b/jstorm-core/src/main/java/backtype/storm/serialization/types/HashSetSerializer.java index 77fc353..eb3aab0 100755 --- a/jstorm-core/src/main/java/backtype/storm/serialization/types/HashSetSerializer.java +++ b/jstorm-core/src/main/java/backtype/storm/serialization/types/HashSetSerializer.java @@ -23,10 +23,9 @@ import com.esotericsoftware.kryo.serializers.CollectionSerializer; import java.util.Collection; import java.util.HashSet; - public class HashSetSerializer extends CollectionSerializer { @Override public Collection create(Kryo kryo, Input input, Class<Collection> type) { return new HashSet(); - } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/types/ListDelegateSerializer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/types/ListDelegateSerializer.java b/jstorm-core/src/main/java/backtype/storm/serialization/types/ListDelegateSerializer.java index c71a19d..c65f16a 100755 --- a/jstorm-core/src/main/java/backtype/storm/serialization/types/ListDelegateSerializer.java +++ b/jstorm-core/src/main/java/backtype/storm/serialization/types/ListDelegateSerializer.java @@ -23,10 +23,9 @@ import com.esotericsoftware.kryo.serializers.CollectionSerializer; import backtype.storm.utils.ListDelegate; import java.util.Collection; - public class ListDelegateSerializer extends CollectionSerializer { @Override public Collection create(Kryo kryo, Input input, Class<Collection> type) { return new ListDelegate(); - } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/spout/IMultiSchemableSpout.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/spout/IMultiSchemableSpout.java b/jstorm-core/src/main/java/backtype/storm/spout/IMultiSchemableSpout.java index 5999fbb..9b837ba 100755 --- a/jstorm-core/src/main/java/backtype/storm/spout/IMultiSchemableSpout.java +++ b/jstorm-core/src/main/java/backtype/storm/spout/IMultiSchemableSpout.java @@ -18,6 +18,7 @@ package backtype.storm.spout; public interface IMultiSchemableSpout { - MultiScheme getScheme(); - void setScheme(MultiScheme scheme); + MultiScheme getScheme(); + + void setScheme(MultiScheme scheme); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/spout/ISchemableSpout.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/spout/ISchemableSpout.java b/jstorm-core/src/main/java/backtype/storm/spout/ISchemableSpout.java index df455d9..7eca980 100755 --- a/jstorm-core/src/main/java/backtype/storm/spout/ISchemableSpout.java +++ b/jstorm-core/src/main/java/backtype/storm/spout/ISchemableSpout.java @@ -17,8 +17,8 @@ */ package backtype.storm.spout; - public interface ISchemableSpout { - Scheme getScheme(); - void setScheme(Scheme scheme); + Scheme getScheme(); + + void setScheme(Scheme scheme); }
