[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

2015-11-13 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/838#issuecomment-156504222
  
@ptgoetz Sounds good. The good news is that Pacemaker is still off by 
default, so aside from making ClusterState pluggable, there should be no 
changes to the existing code path.

I'm currently working on documentation for it as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-16 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r44989185
  
--- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
 ---
@@ -0,0 +1,381 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.blobstore.AtomicOutputStream;
+import backtype.storm.blobstore.AtomicOutputStream;
+import backtype.storm.blobstore.BlobStore;
+import backtype.storm.blobstore.BlobStoreAclHandler;
+import backtype.storm.blobstore.BlobStoreFile;
+import backtype.storm.blobstore.InputStreamWithMeta;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.ReadableBlobMeta;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift7.TBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.ByteArrayOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Iterator;
+import java.util.Map;
+
+import static backtype.storm.blobstore.BlobStoreAclHandler.ADMIN;
+import static backtype.storm.blobstore.BlobStoreAclHandler.READ;
+import static backtype.storm.blobstore.BlobStoreAclHandler.WRITE;
+
+/**
+ * Provides a HDFS file system backed blob store implementation.
+ * Note that this provides an api for having HDFS be the backing store for 
the blobstore,
+ * it is not a service/daemon.
+ */
+public class HdfsBlobStore extends BlobStore {
+  public static final Logger LOG = 
LoggerFactory.getLogger(HdfsBlobStore.class);
+  private static final String DATA_PREFIX = "data_";
+  private static final String META_PREFIX = "meta_";
+  private BlobStoreAclHandler _aclHandler;
+  private HdfsBlobStoreImpl _hbs;
+  private Subject _localSubject;
+  private Map conf;
+
+  /*
+   * Get the subject from Hadoop so we can use it to validate the acls. 
There is no direct
+   * interface from UserGroupInformation to get the subject, so do a doAs 
and get the context.
+   * We could probably run everything in the doAs but for now just grab 
the subject.
+   */
--- End diff --

Can we turn this into an actual javadoc-style comment?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-16 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r44989413
  
--- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
 ---
@@ -0,0 +1,381 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.blobstore.AtomicOutputStream;
+import backtype.storm.blobstore.AtomicOutputStream;
+import backtype.storm.blobstore.BlobStore;
+import backtype.storm.blobstore.BlobStoreAclHandler;
+import backtype.storm.blobstore.BlobStoreFile;
+import backtype.storm.blobstore.InputStreamWithMeta;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.ReadableBlobMeta;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift7.TBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.ByteArrayOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Iterator;
+import java.util.Map;
+
+import static backtype.storm.blobstore.BlobStoreAclHandler.ADMIN;
+import static backtype.storm.blobstore.BlobStoreAclHandler.READ;
+import static backtype.storm.blobstore.BlobStoreAclHandler.WRITE;
+
+/**
+ * Provides a HDFS file system backed blob store implementation.
+ * Note that this provides an api for having HDFS be the backing store for 
the blobstore,
+ * it is not a service/daemon.
+ */
+public class HdfsBlobStore extends BlobStore {
--- End diff --

This file uses 2-space indentation, whereas the rest of the project uses 
4-space.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-16 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r44989679
  
--- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreFile.java
 ---
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.blobstore;
+
+import backtype.storm.blobstore.BlobStoreFile;
+import backtype.storm.generated.SettableBlobMeta;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.regex.Matcher;
+
+public class HdfsBlobStoreFile extends BlobStoreFile {
+  public static final Logger LOG = 
LoggerFactory.getLogger(HdfsBlobStoreFile.class);
+
+  private final String _key;
+  private final boolean _isTmp;
+  private final Path _path;
+  private Long _modTime = null;
+  private final boolean _mustBeNew;
+  private final Configuration _hadoopConf;
+  private final FileSystem _fs;
+  private SettableBlobMeta meta;
+
+  // files are world-wide readable and owner writable
+  final public static FsPermission BLOBSTORE_FILE_PERMISSION =
+  FsPermission.createImmutable((short) 0644); // rw-r--r--
+
+  public HdfsBlobStoreFile(Path base, String name, Configuration hconf) {
--- End diff --

Same 2-spacing here. If there are more files using 2-spacing I won't 
mention them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-16 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r44989851
  
--- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
 ---
@@ -0,0 +1,381 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.blobstore.AtomicOutputStream;
+import backtype.storm.blobstore.AtomicOutputStream;
+import backtype.storm.blobstore.BlobStore;
+import backtype.storm.blobstore.BlobStoreAclHandler;
+import backtype.storm.blobstore.BlobStoreFile;
+import backtype.storm.blobstore.InputStreamWithMeta;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.ReadableBlobMeta;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift7.TBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.ByteArrayOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Iterator;
+import java.util.Map;
+
+import static backtype.storm.blobstore.BlobStoreAclHandler.ADMIN;
+import static backtype.storm.blobstore.BlobStoreAclHandler.READ;
+import static backtype.storm.blobstore.BlobStoreAclHandler.WRITE;
+
+/**
+ * Provides a HDFS file system backed blob store implementation.
+ * Note that this provides an api for having HDFS be the backing store for 
the blobstore,
+ * it is not a service/daemon.
+ */
+public class HdfsBlobStore extends BlobStore {
+  public static final Logger LOG = 
LoggerFactory.getLogger(HdfsBlobStore.class);
+  private static final String DATA_PREFIX = "data_";
+  private static final String META_PREFIX = "meta_";
+  private BlobStoreAclHandler _aclHandler;
+  private HdfsBlobStoreImpl _hbs;
+  private Subject _localSubject;
+  private Map conf;
+
+  /*
+   * Get the subject from Hadoop so we can use it to validate the acls. 
There is no direct
+   * interface from UserGroupInformation to get the subject, so do a doAs 
and get the context.
+   * We could probably run everything in the doAs but for now just grab 
the subject.
+   */
+  private Subject getHadoopUser() {
+Subject subj;
+try {
+  subj = UserGroupInformation.getCurrentUser().doAs(
+  new PrivilegedAction() {
+@Override
+public Subject run() {
+  return Subject.getSubject(AccessController.getContext());
+}
+  });
+} catch (IOException e) {
+  throw new RuntimeException("Error creating subject and logging user 
in!", e);
+}
+return subj;
+  }
+
+  // If who is null then we want to use the user hadoop says we are.
+  // Required for the supervisor to call these routines as its not
+  // logged in as anyone.
+  private Subject checkAndGetSubject(Subject who) {
+if (who == null) {
+  return _localSubject;
+}
+return who;
+  }
+
+  @Override
+  public void prepare(Map conf, String overrideBase, NimbusInfo 
nimbusInfo) {
+this.conf = conf;
+prepareInternal(conf, overrideBase, null);
+  }
+
+  /*
+   * Allow a Hadoop Configuration to be passed for testing. If it's null 
then

[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-16 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r44991284
  
--- Diff: storm-core/src/clj/backtype/storm/command/blobstore.clj ---
@@ -0,0 +1,162 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns backtype.storm.command.blobstore
+  (:import [java.io InputStream OutputStream])
+  (:use [backtype.storm config])
+  (:import [backtype.storm.generated SettableBlobMeta AccessControl 
AuthorizationException
+KeyNotFoundException])
+  (:import [backtype.storm.blobstore BlobStoreAclHandler])
+  (:use [clojure.string :only [split]])
+  (:use [clojure.tools.cli :only [cli]])
+  (:use [clojure.java.io :only [copy input-stream output-stream]])
+  (:use [backtype.storm blobstore log util])
--- End diff --

Since this is a new file, can we merge all the `use`s and `import`s into 
one `use` and one `import` form?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-16 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r44994348
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/KeyVersion.java ---
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.TreeSet;
+import java.util.Map;
+import java.util.List;
+
+/**
+ * Class hands over the version of the key to be stored within the 
zookeeper
+ */
+public class KeyVersion {
+  private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+  private final String BLOBSTORE_SUBTREE="/blobstore";
+  private final String 
BLOBSTORE_KEY_COUNTER_SUBTREE="/blobstorekeycounter";
+  private String key;
+  private NimbusInfo nimbusInfo;
+
+  public KeyVersion(String key, NimbusInfo nimbusInfo) {
+this.key = key;
+this.nimbusInfo = nimbusInfo;
+  }
+
+  public int getKey(Map conf) {
--- End diff --

The name of this method is confusing. It does not get a key. It gets a 
version.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-16 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r44995171
  
--- Diff: storm-core/src/clj/backtype/storm/daemon/supervisor.clj ---
@@ -326,16 +331,29 @@
  (log-error t "Error when 
processing event")
  (exit-process! 20 "Error when 
processing an event")
  ))
+   :blob-update-timer (mk-timer :kill-fn (fn [t]
+   (log-error t "Error when 
processing blob-update")
+   (exit-process! 20 "Error when 
processing a blob-update")
+   ))
--- End diff --

Danglers


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-16 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r44995542
  
--- Diff: storm-core/src/clj/backtype/storm/daemon/supervisor.clj ---
@@ -326,16 +331,29 @@
  (log-error t "Error when 
processing event")
  (exit-process! 20 "Error when 
processing an event")
  ))
+   :blob-update-timer (mk-timer :kill-fn (fn [t]
+   (log-error t "Error when 
processing blob-update")
+   (exit-process! 20 "Error when 
processing a blob-update")
--- End diff --

Do we want a different error code for blob-update?
Maybe not, I'm not sure. The error message is different than the others.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-16 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r44995641
  
--- Diff: storm-core/src/clj/backtype/storm/daemon/supervisor.clj ---
@@ -326,16 +331,29 @@
  (log-error t "Error when 
processing event")
  (exit-process! 20 "Error when 
processing an event")
  ))
+   :blob-update-timer (mk-timer :kill-fn (fn [t]
+   (log-error t "Error when 
processing blob-update")
+   (exit-process! 20 "Error when 
processing a blob-update")
+   ))
+   :localizer (Utils/createLocalizer conf (supervisor-local-dir conf))
:assignment-versions (atom {})
:sync-retry (atom 0)
-   :code-distributor (mk-code-distributor conf)
:download-lock (Object.)
:stormid->profiler-actions (atom {})
})
 
+(defn required-topo-files-exist?
+  [conf storm-id]
+  (let [stormroot (supervisor-stormdist-root conf storm-id)
+stormjarpath (supervisor-stormjar-path stormroot)
+stormcodepath (supervisor-stormcode-path stormroot)
+stormconfpath (supervisor-stormconf-path stormroot)]
+(and (every? exists-file? [stormroot stormconfpath stormcodepath])
+  (or (local-mode? conf)
+(exists-file? stormjarpath)
--- End diff --

Indentation on 352 and 353 is wrong. Should be inline with previous args.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-16 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r44996296
  
--- Diff: storm-core/src/clj/backtype/storm/daemon/supervisor.clj ---
@@ -454,10 +456,78 @@
   (shutdown-worker supervisor id))
 ))
 
+(defn get-blob-localname
+  "Given the blob information either gets the localname field if it exists,
+  else routines the default value passed in."
+  [blob-info defaultValue]
+  (if-let [val (if blob-info (get blob-info "localname") nil)] val 
defaultValue))
+
+(defn should-uncompress-blob?
+  "Given the blob information returns the value of the uncompress field, 
handling it either being
+  a string or a boolean value, or ifs its not specified then returns false"
+  [blob-info]
+  (boolean (and blob-info
+ (if-let [val (get blob-info "uncompress")]
+   (.booleanValue (Boolean. val))
+
+(defn remove-blob-references
+  "Remove a reference to a blob when its no longer needed."
+  [localizer storm-id conf]
+  (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+user (storm-conf TOPOLOGY-SUBMITTER-USER)
+topo-name (storm-conf TOPOLOGY-NAME)]
+(if blobstore-map (doseq [[k, v] blobstore-map]
--- End diff --

the 'then' part should be on a new line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-16 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r44996393
  
--- Diff: storm-core/src/clj/backtype/storm/daemon/supervisor.clj ---
@@ -454,10 +456,78 @@
   (shutdown-worker supervisor id))
 ))
 
+(defn get-blob-localname
+  "Given the blob information either gets the localname field if it exists,
+  else routines the default value passed in."
+  [blob-info defaultValue]
+  (if-let [val (if blob-info (get blob-info "localname") nil)] val 
defaultValue))
+
+(defn should-uncompress-blob?
+  "Given the blob information returns the value of the uncompress field, 
handling it either being
+  a string or a boolean value, or ifs its not specified then returns false"
+  [blob-info]
+  (boolean (and blob-info
+ (if-let [val (get blob-info "uncompress")]
+   (.booleanValue (Boolean. val))
+
+(defn remove-blob-references
+  "Remove a reference to a blob when its no longer needed."
+  [localizer storm-id conf]
+  (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+user (storm-conf TOPOLOGY-SUBMITTER-USER)
+topo-name (storm-conf TOPOLOGY-NAME)]
+(if blobstore-map (doseq [[k, v] blobstore-map]
+(.removeBlobReference localizer
+  k
+  user
+  topo-name
+  (should-uncompress-blob? v))
+
+(defn blobstore-map-to-localresources
+  "Returns a list of LocalResources based on the blobstore-map passed in."
+  [blobstore-map]
+  (if blobstore-map
+(for [[k, v] blobstore-map] (LocalResource. k (should-uncompress-blob? 
v)))
+()))
--- End diff --

Body should be on its own line. 
What is the empty pair of parens here for?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-16 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r44996771
  
--- Diff: storm-core/src/clj/backtype/storm/daemon/supervisor.clj ---
@@ -454,10 +456,78 @@
   (shutdown-worker supervisor id))
 ))
 
+(defn get-blob-localname
+  "Given the blob information either gets the localname field if it exists,
+  else routines the default value passed in."
+  [blob-info defaultValue]
+  (if-let [val (if blob-info (get blob-info "localname") nil)] val 
defaultValue))
+
+(defn should-uncompress-blob?
+  "Given the blob information returns the value of the uncompress field, 
handling it either being
+  a string or a boolean value, or ifs its not specified then returns false"
+  [blob-info]
+  (boolean (and blob-info
+ (if-let [val (get blob-info "uncompress")]
+   (.booleanValue (Boolean. val))
+
+(defn remove-blob-references
+  "Remove a reference to a blob when its no longer needed."
+  [localizer storm-id conf]
+  (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+user (storm-conf TOPOLOGY-SUBMITTER-USER)
+topo-name (storm-conf TOPOLOGY-NAME)]
+(if blobstore-map (doseq [[k, v] blobstore-map]
+(.removeBlobReference localizer
+  k
+  user
+  topo-name
+  (should-uncompress-blob? v))
+
+(defn blobstore-map-to-localresources
+  "Returns a list of LocalResources based on the blobstore-map passed in."
+  [blobstore-map]
+  (if blobstore-map
+(for [[k, v] blobstore-map] (LocalResource. k (should-uncompress-blob? 
v)))
+()))
+
+(defn add-blob-references
+  "For each of the downloaded topologies, adds references to the blobs 
that the topologies are
+  using. This is used to reconstruct the cache on restart."
+  [localizer storm-id conf]
+  (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+user (storm-conf TOPOLOGY-SUBMITTER-USER)
+topo-name (storm-conf TOPOLOGY-NAME)
+localresources (blobstore-map-to-localresources blobstore-map)]
+(if blobstore-map (.addReferences localizer localresources user 
topo-name
--- End diff --

'then' on its own line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-16 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r44996811
  
--- Diff: storm-core/src/clj/backtype/storm/daemon/supervisor.clj ---
@@ -454,10 +456,78 @@
   (shutdown-worker supervisor id))
 ))
 
+(defn get-blob-localname
+  "Given the blob information either gets the localname field if it exists,
+  else routines the default value passed in."
+  [blob-info defaultValue]
+  (if-let [val (if blob-info (get blob-info "localname") nil)] val 
defaultValue))
+
+(defn should-uncompress-blob?
+  "Given the blob information returns the value of the uncompress field, 
handling it either being
+  a string or a boolean value, or ifs its not specified then returns false"
+  [blob-info]
+  (boolean (and blob-info
+ (if-let [val (get blob-info "uncompress")]
+   (.booleanValue (Boolean. val))
+
+(defn remove-blob-references
+  "Remove a reference to a blob when its no longer needed."
+  [localizer storm-id conf]
+  (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+user (storm-conf TOPOLOGY-SUBMITTER-USER)
+topo-name (storm-conf TOPOLOGY-NAME)]
+(if blobstore-map (doseq [[k, v] blobstore-map]
+(.removeBlobReference localizer
+  k
+  user
+  topo-name
+  (should-uncompress-blob? v))
+
+(defn blobstore-map-to-localresources
+  "Returns a list of LocalResources based on the blobstore-map passed in."
+  [blobstore-map]
+  (if blobstore-map
+(for [[k, v] blobstore-map] (LocalResource. k (should-uncompress-blob? 
v)))
+()))
+
+(defn add-blob-references
+  "For each of the downloaded topologies, adds references to the blobs 
that the topologies are
+  using. This is used to reconstruct the cache on restart."
+  [localizer storm-id conf]
+  (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+user (storm-conf TOPOLOGY-SUBMITTER-USER)
+topo-name (storm-conf TOPOLOGY-NAME)
+localresources (blobstore-map-to-localresources blobstore-map)]
+(if blobstore-map (.addReferences localizer localresources user 
topo-name
+
+(defn rm-topo-files
+  [conf storm-id localizer rm-blob-refs?]
+  (try
+(if (= true rm-blob-refs?)
+  (remove-blob-references localizer storm-id conf))
+(if (conf SUPERVISOR-RUN-WORKER-AS-USER)
+  (rmr-as-user conf storm-id (supervisor-stormdist-root conf storm-id))
+  (rmr (supervisor-stormdist-root conf storm-id)))
+(catch Exception e (log-message e (str "Exception removing: " 
storm-id)
--- End diff --

Catch body on its own line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-16 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r44996912
  
--- Diff: storm-core/src/clj/backtype/storm/daemon/supervisor.clj ---
@@ -454,10 +456,78 @@
   (shutdown-worker supervisor id))
 ))
 
+(defn get-blob-localname
+  "Given the blob information either gets the localname field if it exists,
+  else routines the default value passed in."
+  [blob-info defaultValue]
+  (if-let [val (if blob-info (get blob-info "localname") nil)] val 
defaultValue))
+
+(defn should-uncompress-blob?
+  "Given the blob information returns the value of the uncompress field, 
handling it either being
+  a string or a boolean value, or ifs its not specified then returns false"
+  [blob-info]
+  (boolean (and blob-info
+ (if-let [val (get blob-info "uncompress")]
+   (.booleanValue (Boolean. val))
+
+(defn remove-blob-references
+  "Remove a reference to a blob when its no longer needed."
+  [localizer storm-id conf]
+  (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+user (storm-conf TOPOLOGY-SUBMITTER-USER)
+topo-name (storm-conf TOPOLOGY-NAME)]
+(if blobstore-map (doseq [[k, v] blobstore-map]
+(.removeBlobReference localizer
+  k
+  user
+  topo-name
+  (should-uncompress-blob? v))
+
+(defn blobstore-map-to-localresources
+  "Returns a list of LocalResources based on the blobstore-map passed in."
+  [blobstore-map]
+  (if blobstore-map
+(for [[k, v] blobstore-map] (LocalResource. k (should-uncompress-blob? 
v)))
+()))
+
+(defn add-blob-references
+  "For each of the downloaded topologies, adds references to the blobs 
that the topologies are
+  using. This is used to reconstruct the cache on restart."
+  [localizer storm-id conf]
+  (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+user (storm-conf TOPOLOGY-SUBMITTER-USER)
+topo-name (storm-conf TOPOLOGY-NAME)
+localresources (blobstore-map-to-localresources blobstore-map)]
+(if blobstore-map (.addReferences localizer localresources user 
topo-name
+
+(defn rm-topo-files
+  [conf storm-id localizer rm-blob-refs?]
+  (try
+(if (= true rm-blob-refs?)
+  (remove-blob-references localizer storm-id conf))
+(if (conf SUPERVISOR-RUN-WORKER-AS-USER)
+  (rmr-as-user conf storm-id (supervisor-stormdist-root conf storm-id))
+  (rmr (supervisor-stormdist-root conf storm-id)))
+(catch Exception e (log-message e (str "Exception removing: " 
storm-id)
+
+(defn verify-downloaded-files [conf localizer assigned-storm-ids 
all-downloaded-storm-ids]
+  "Method written to check for the files exists to avoid supervisor 
crashing
+   Also makes sure there is no necessity for locking"
+  (remove nil?
+(into #{}
+  (for [storm-id all-downloaded-storm-ids
+:let [rm-blob-refs? false]
+:when (contains? assigned-storm-ids storm-id)]
+(if (not (required-topo-files-exist? conf storm-id))
+  (do
+(log-debug "Files not present in topology directory")
+(rm-topo-files conf storm-id localizer rm-blob-refs?) 
storm-id))
--- End diff --

storm-id should get its own line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-16 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r44997103
  
--- Diff: storm-core/src/clj/backtype/storm/daemon/supervisor.clj ---
@@ -526,6 +605,41 @@
   {Config/SUPERVISOR_MEMORY_CAPACITY_MB (double (conf 
SUPERVISOR-MEMORY-CAPACITY-MB))
Config/SUPERVISOR_CPU_CAPACITY (double (conf SUPERVISOR-CPU-CAPACITY))})
 
+(defn update-blobs-for-topology!
+  "Update each blob listed in the topology configuration if the latest 
version of the blob
+   has not been downloaded."
+  [conf storm-id localizer]
+  (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+user (storm-conf TOPOLOGY-SUBMITTER-USER)
+topo-name (storm-conf TOPOLOGY-NAME)
+user-dir (.getLocalUserFileCacheDir localizer user)
+localresources (blobstore-map-to-localresources blobstore-map)]
+(try
+  (.updateBlobs localizer localresources user)
+  (catch AuthorizationException authExp
+(log-error authExp))
+  (catch KeyNotFoundException knf
+(log-error knf)
+
+(defn update-blobs-for-all-topologies-fn
+  "Returns a function that downloads all blobs listed in the topology 
configuration for all topologies assigned
+  to this supervisor, and creates version files with a suffix. The 
returned function is intended to be run periodically
+  by a timer, created elsewhere."
+  [supervisor]
+  (fn this []
--- End diff --

Don't name the function 'this'. Either give it a meaningful name or get rid 
of the name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-16 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r44997133
  
--- Diff: storm-core/src/clj/backtype/storm/daemon/supervisor.clj ---
@@ -526,6 +605,41 @@
   {Config/SUPERVISOR_MEMORY_CAPACITY_MB (double (conf 
SUPERVISOR-MEMORY-CAPACITY-MB))
Config/SUPERVISOR_CPU_CAPACITY (double (conf SUPERVISOR-CPU-CAPACITY))})
 
+(defn update-blobs-for-topology!
+  "Update each blob listed in the topology configuration if the latest 
version of the blob
+   has not been downloaded."
+  [conf storm-id localizer]
+  (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+user (storm-conf TOPOLOGY-SUBMITTER-USER)
+topo-name (storm-conf TOPOLOGY-NAME)
+user-dir (.getLocalUserFileCacheDir localizer user)
+localresources (blobstore-map-to-localresources blobstore-map)]
+(try
+  (.updateBlobs localizer localresources user)
+  (catch AuthorizationException authExp
+(log-error authExp))
+  (catch KeyNotFoundException knf
+(log-error knf)
+
+(defn update-blobs-for-all-topologies-fn
+  "Returns a function that downloads all blobs listed in the topology 
configuration for all topologies assigned
+  to this supervisor, and creates version files with a suffix. The 
returned function is intended to be run periodically
+  by a timer, created elsewhere."
+  [supervisor]
+  (fn this []
+(try
+  (let [conf (:conf supervisor)
+downloaded-storm-ids (set (read-downloaded-storm-ids conf))
+new-assignment @(:curr-assignment supervisor)
+assigned-storm-ids (assigned-storm-ids-from-port-assignments 
new-assignment)]
+(doseq [topology-id downloaded-storm-ids]
+  (let [storm-root (supervisor-stormdist-root conf topology-id)]
+(when (assigned-storm-ids topology-id)
+  (log-debug "Checking Blob updates for storm topology id " 
topology-id " With target_dir: " storm-root)
+  (update-blobs-for-topology! conf topology-id (:localizer 
supervisor))
+  (catch Exception e (log-error e "Error updating blobs, will retry 
again later")
--- End diff --

Exception body on its own line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-16 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r44997265
  
--- Diff: storm-core/src/clj/backtype/storm/daemon/supervisor.clj ---
@@ -671,6 +787,11 @@
 0
 (conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS)
 heartbeat-fn)
+(doseq [storm-id downloaded-storm-ids] (add-blob-references 
(:localizer supervisor) storm-id
+ conf))
--- End diff --

doseq body should have its own line. Then conf can be moved back into the 
same line as the function it is being passed to.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-16 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r44997502
  
--- Diff: storm-core/src/clj/backtype/storm/daemon/supervisor.clj ---
@@ -732,25 +863,89 @@
  (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
   (worker-launcher-and-wait conf (storm-conf TOPOLOGY-SUBMITTER-USER) 
["code-dir" dir] :log-prefix (str "setup conf for " dir
 
+(defn setup-blob-permission [conf storm-conf path]
+  (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
+(worker-launcher-and-wait conf (storm-conf TOPOLOGY-SUBMITTER-USER) 
["blob" path] :log-prefix (str "setup blob permissions for " path
+
+(defn setup-storm-code-dir [conf storm-conf dir]
+  (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
+(worker-launcher-and-wait conf (storm-conf TOPOLOGY-SUBMITTER-USER) 
["code-dir" dir] :log-prefix (str "setup conf for " dir
--- End diff --

It might be nice to break these function calls up, but that's really 
nit-picky. Not a big deal at all.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-16 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r44997547
  
--- Diff: storm-core/src/clj/backtype/storm/daemon/supervisor.clj ---
@@ -732,25 +863,89 @@
  (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
   (worker-launcher-and-wait conf (storm-conf TOPOLOGY-SUBMITTER-USER) 
["code-dir" dir] :log-prefix (str "setup conf for " dir
 
+(defn setup-blob-permission [conf storm-conf path]
+  (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
+(worker-launcher-and-wait conf (storm-conf TOPOLOGY-SUBMITTER-USER) 
["blob" path] :log-prefix (str "setup blob permissions for " path
+
+(defn setup-storm-code-dir [conf storm-conf dir]
+  (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
+(worker-launcher-and-wait conf (storm-conf TOPOLOGY-SUBMITTER-USER) 
["code-dir" dir] :log-prefix (str "setup conf for " dir
+
+(defn download-blobs-for-topology!
+  "Download all blobs listed in the topology configuration for a given 
topology."
+  [conf stormconf-path localizer tmproot]
+  (let [storm-conf (read-supervisor-storm-conf-given-path conf 
stormconf-path)
+blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+user (storm-conf TOPOLOGY-SUBMITTER-USER)
+topo-name (storm-conf TOPOLOGY-NAME)
+user-dir (.getLocalUserFileCacheDir localizer user)
+localresources (blobstore-map-to-localresources blobstore-map)]
+(when localresources
+  (when-not (.exists user-dir)
+(FileUtils/forceMkdir user-dir)
+(setup-blob-permission conf storm-conf (.toString user-dir)))
+  (try
+(let [localized-resources (.getBlobs localizer localresources user 
topo-name user-dir)]
+  (setup-blob-permission conf storm-conf (.toString user-dir))
+  (doseq [local-rsrc localized-resources]
+(let [rsrc-file-path (File. (.getFilePath local-rsrc))
+  key-name (.getName rsrc-file-path)
+  blob-symlink-target-name (.getName (File. 
(.getCurrentSymlinkPath local-rsrc)))
+  symlink-name (get-blob-localname (get blobstore-map 
key-name) key-name)]
+  (create-symlink! tmproot (.getParent rsrc-file-path) 
symlink-name
+blob-symlink-target-name
+(catch AuthorizationException authExp
+  (log-error authExp))
+(catch KeyNotFoundException knf
+  (log-error knf))
+
+(defn get-blob-file-names
+  [blobstore-map]
+  (if blobstore-map
+(for [[k, data] blobstore-map] (get-blob-localname data k
--- End diff --

body on own line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-16 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r44997943
  
--- Diff: storm-core/src/clj/backtype/storm/daemon/supervisor.clj ---
@@ -927,31 +1133,32 @@
first ))
 
 (defmethod download-storm-code
-:local [conf storm-id master-code-dir supervisor download-lock]
-(let [stormroot (supervisor-stormdist-root conf storm-id)]
-  (locking download-lock
-(FileUtils/copyDirectory (File. master-code-dir) (File. 
stormroot))
-(let [classloader (.getContextClassLoader 
(Thread/currentThread))
-  resources-jar (resources-jar)
-  url (.getResource classloader RESOURCES-SUBDIR)
-  target-dir (str stormroot file-path-separator 
RESOURCES-SUBDIR)]
-  (cond
-   resources-jar
-   (do
- (log-message "Extracting resources from jar at " 
resources-jar " to " target-dir)
- (extract-dir-from-jar resources-jar RESOURCES-SUBDIR 
stormroot))
-   url
-   (do
- (log-message "Copying resources at " (URI. (str url)) " 
to " target-dir)
- (if (= (.getProtocol url) "jar" )
-   (extract-dir-from-jar (.getFile (.getJarFileURL 
(.openConnection url))) RESOURCES-SUBDIR stormroot)
-   (FileUtils/copyDirectory (File. (.getPath (URI. (str 
url (File. target-dir)))
- )
-   )
-  )
-)))
-
-(defmethod mk-code-distributor :local [conf] nil)
+  :local [conf storm-id master-code-dir localizer]
+  (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid))
+stormroot (supervisor-stormdist-root conf storm-id)
+blob-store (Utils/getNimbusBlobStore conf master-code-dir nil)]
+(try
+  (FileUtils/forceMkdir (File. tmproot))
+  (.readBlobTo blob-store (master-stormcode-key storm-id) 
(FileOutputStream. (supervisor-stormcode-path tmproot)) nil)
+  (.readBlobTo blob-store (master-stormconf-key storm-id) 
(FileOutputStream. (supervisor-stormconf-path tmproot)) nil)
+  (finally
+(.shutdown blob-store)))
+(FileUtils/moveDirectory (File. tmproot) (File. stormroot))
+(setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) 
stormroot)
+(let [classloader (.getContextClassLoader (Thread/currentThread))
+  resources-jar (resources-jar)
+  url (.getResource classloader RESOURCES-SUBDIR)
+  target-dir (str stormroot file-path-separator RESOURCES-SUBDIR)]
+  (cond
+resources-jar
--- End diff --

Lots of blank space at the end of this line?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-16 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r44997972
  
--- Diff: storm-core/src/clj/backtype/storm/daemon/supervisor.clj ---
@@ -927,31 +1133,32 @@
first ))
 
 (defmethod download-storm-code
-:local [conf storm-id master-code-dir supervisor download-lock]
-(let [stormroot (supervisor-stormdist-root conf storm-id)]
-  (locking download-lock
-(FileUtils/copyDirectory (File. master-code-dir) (File. 
stormroot))
-(let [classloader (.getContextClassLoader 
(Thread/currentThread))
-  resources-jar (resources-jar)
-  url (.getResource classloader RESOURCES-SUBDIR)
-  target-dir (str stormroot file-path-separator 
RESOURCES-SUBDIR)]
-  (cond
-   resources-jar
-   (do
- (log-message "Extracting resources from jar at " 
resources-jar " to " target-dir)
- (extract-dir-from-jar resources-jar RESOURCES-SUBDIR 
stormroot))
-   url
-   (do
- (log-message "Copying resources at " (URI. (str url)) " 
to " target-dir)
- (if (= (.getProtocol url) "jar" )
-   (extract-dir-from-jar (.getFile (.getJarFileURL 
(.openConnection url))) RESOURCES-SUBDIR stormroot)
-   (FileUtils/copyDirectory (File. (.getPath (URI. (str 
url (File. target-dir)))
- )
-   )
-  )
-)))
-
-(defmethod mk-code-distributor :local [conf] nil)
+  :local [conf storm-id master-code-dir localizer]
+  (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid))
+stormroot (supervisor-stormdist-root conf storm-id)
+blob-store (Utils/getNimbusBlobStore conf master-code-dir nil)]
+(try
+  (FileUtils/forceMkdir (File. tmproot))
+  (.readBlobTo blob-store (master-stormcode-key storm-id) 
(FileOutputStream. (supervisor-stormcode-path tmproot)) nil)
+  (.readBlobTo blob-store (master-stormconf-key storm-id) 
(FileOutputStream. (supervisor-stormconf-path tmproot)) nil)
+  (finally
+(.shutdown blob-store)))
+(FileUtils/moveDirectory (File. tmproot) (File. stormroot))
+(setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) 
stormroot)
+(let [classloader (.getContextClassLoader (Thread/currentThread))
+  resources-jar (resources-jar)
+  url (.getResource classloader RESOURCES-SUBDIR)
+  target-dir (str stormroot file-path-separator RESOURCES-SUBDIR)]
+  (cond
+resources-jar
+(do
+  (log-message "Extracting resources from jar at " resources-jar " 
to " target-dir)
+  (extract-dir-from-jar resources-jar RESOURCES-SUBDIR stormroot))
+url
+(do
+  (log-message "Copying resources at " (str url) " to " target-dir)
+  (FileUtils/copyDirectory (File. (.getFile url)) (File. 
target-dir))
+  )
--- End diff --

danglers


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

2015-11-17 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/838#discussion_r45116436
  
--- Diff: storm-core/src/jvm/backtype/storm/Config.java ---
@@ -778,6 +792,47 @@
 public static final String UI_HTTPS_NEED_CLIENT_AUTH = 
"ui.https.need.client.auth";
 
 /**
+ * The host that Pacemaker is running on.
+ */
+@isString
+public static final String PACEMAKER_HOST = "pacemaker.host";
+
+/**
+ * The port Pacemaker should run on. Clients should
+ * connect to this port to submit or read heartbeats.
+ */
+@isNumber
+@isPositiveNumber
+public static final String PACEMAKER_PORT = "pacemaker.port";
+
+/**
+ * The maximum number of threads that should be used by the Pacemaker.
+ * When Pacemaker gets loaded it will spawn new threads, up to 
+ * this many total, to handle the load.
+ */
+@isNumber
+@isPositiveNumber
+public static final String PACEMAKER_MAX_THREADS = 
"pacemaker.max.threads";
+
+/**
+ * This parameter is used by the storm-deploy project to configure the
+ * jvm options for the nimbus daemon.
--- End diff --

*nimbus* daemon


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

2015-11-17 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/838#discussion_r45125877
  
--- Diff: storm-core/src/jvm/backtype/storm/Config.java ---
@@ -778,6 +792,47 @@
 public static final String UI_HTTPS_NEED_CLIENT_AUTH = 
"ui.https.need.client.auth";
 
 /**
+ * The host that Pacemaker is running on.
+ */
+@isString
+public static final String PACEMAKER_HOST = "pacemaker.host";
+
+/**
+ * The port Pacemaker should run on. Clients should
+ * connect to this port to submit or read heartbeats.
+ */
+@isNumber
+@isPositiveNumber
+public static final String PACEMAKER_PORT = "pacemaker.port";
+
+/**
+ * The maximum number of threads that should be used by the Pacemaker.
+ * When Pacemaker gets loaded it will spawn new threads, up to 
+ * this many total, to handle the load.
+ */
+@isNumber
+@isPositiveNumber
+public static final String PACEMAKER_MAX_THREADS = 
"pacemaker.max.threads";
+
+/**
+ * This parameter is used by the storm-deploy project to configure the
+ * jvm options for the nimbus daemon.
--- End diff --

@erikdw Yep. This was a note for myself. (this is my PR) :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

2015-11-17 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/838#issuecomment-157542893
  
@ptgoetz I added some documentation at docs/documentation/Pacemaker.md


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

2015-11-18 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/838#issuecomment-157862156
  
For the record, the netty messaging layer changes are only to achieve 2 
things that were necessary to make the heartbeats work.
 - Add Kerberos SASL plugin for the Netty pipeline
 - Facilitate generic serialization with INettySerializable


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

2015-11-19 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/838#issuecomment-158120301
  
@ptgoetz 
I've added a couple of sections for that stuff to the bottom of the 
documentation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

2015-11-19 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/838#issuecomment-158120792
  
I'd like to wait for at least one more review and +1 before I merge, since 
both Bobby and Kishor worked with me on the design of the system.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: MINOR: Use /usr/bin/env to find bash

2015-11-19 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/892#issuecomment-158126906
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

2015-11-20 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/838#discussion_r45515488
  
--- Diff: storm-core/src/clj/org/apache/storm/pacemaker/pacemaker.clj ---
@@ -0,0 +1,239 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.pacemaker.pacemaker
+  (:import [org.apache.storm.pacemaker PacemakerServer 
IServerMessageHandler]
+   [java.util.concurrent ConcurrentHashMap ThreadPoolExecutor 
TimeUnit LinkedBlockingDeque]
+   [java.util.concurrent.atomic AtomicInteger]
+   [java.util Date]
+   [backtype.storm.generated
+HBAuthorizationException HBExecutionException HBNodes HBRecords
+HBServerMessageType HBMessage HBMessageData HBPulse])
+  (:use [clojure.string :only [replace-first split]]
+[backtype.storm log config util])
+  (:require [clojure.java.jmx :as jmx])
+  (:gen-class))
+
+
+;; Stats Functions
+
+(def sleep-seconds 60)
+
+
+(defn- check-and-set-loop [stats key new & {:keys [compare new-fn]
+:or {compare (fn [new old] 
true)
+ new-fn (fn [new old] 
new)}}]
+  (loop []
+(let [old (.get (key stats))
+  new (new-fn new old)]
+  (if (compare new old)
+(if (.compareAndSet (key stats) old new)
+  nil
+  (recur))
+nil
+
+(defn- set-average [stats size]
+  (check-and-set-loop
+   stats
+   :average-heartbeat-size
+   size
+   :new-fn (fn [new old]
+(let [count (.get (:send-pulse-count stats))]
+; Weighted average
+  (/ (+ new (* count old)) (+ count 1))
+
+(defn- set-largest [stats size]
+  (check-and-set-loop
+   stats
+   :largest-heartbeat-size
+   size
+   :compare #'>))
+
+(defn- report-stats [heartbeats stats last-five-s]
+  (loop []
+  (let [send-count (.getAndSet (:send-pulse-count stats) 0)
+received-size (.getAndSet (:total-received-size stats) 0)
+get-count (.getAndSet (:get-pulse-count stats) 0)
+sent-size (.getAndSet (:total-sent-size stats) 0)
+largest (.getAndSet (:largest-heartbeat-size stats) 0)
+average (.getAndSet (:average-heartbeat-size stats) 0)
+total-keys (.size heartbeats)]
+(log-debug "\nReceived " send-count " heartbeats totaling " 
received-size " bytes,\n"
+   "Sent " get-count " heartbeats totaling " sent-size " 
bytes,\n"
+   "The largest heartbeat was " largest " bytes,\n"
+   "The average heartbeat was " average " bytes,\n"
+   "Pacemaker contained " total-keys " total keys\n"
+   "in the last " sleep-seconds " second(s)")
+(dosync (ref-set last-five-s
+ {:send-pulse-count send-count
+  :total-received-size received-size
+  :get-pulse-count get-count
+  :total-sent-size sent-size
+  :largest-heartbeat-size largest
+  :average-heartbeat-size average
+  :total-keys total-keys})))
+  (Thread/sleep (* 1000 sleep-seconds))
+  (recur)))
+
+;; JMX stuff
+(defn register [last-five-s]
+  (jmx/register-mbean
+   (jmx/create-bean
+last-five-s)
+   "org.apache.storm.pacemaker.pacemaker:stats=Stats_Last_5_Seconds"))
+
+
+;; Pacemaker Functions
+
+(defn hb-data [conf]
+  (ConcurrentHashMap.))
+
+(defn create-path [^String path heartbeats]
+  (HBMessage

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

2015-11-20 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/838#discussion_r45515660
  
--- Diff: 
storm-core/src/clj/org/apache/storm/pacemaker/pacemaker_state_factory.clj ---
@@ -0,0 +1,124 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.pacemaker.pacemaker-state-factory
+  (:require [org.apache.storm.pacemaker pacemaker]
+[backtype.storm.cluster-state [zookeeper-state-factory :as 
zk-factory]]
+[backtype.storm
+ [config :refer :all]
+ [cluster :refer :all]
+ [log :refer :all]
+ [util :as util]])
+  (:import [backtype.storm.generated
+HBExecutionException HBNodes HBRecords
+HBServerMessageType HBMessage HBMessageData HBPulse]
+   [backtype.storm.cluster_state zookeeper_state_factory]
+   [backtype.storm.cluster ClusterState]
+   [org.apache.storm.pacemaker PacemakerClient])
+  (:gen-class
+   :implements [backtype.storm.cluster.ClusterStateFactory]))
+
+;; So we can mock the client for testing
+(defn makeClient [conf]
+  (PacemakerClient. conf))
+
+(defn makeZKState [conf auth-conf acls context]
+  (.mkState (zookeeper_state_factory.) conf auth-conf acls context))
+
+(def max-retries 10)
+
+(defn -mkState [this conf auth-conf acls context]
+  (let [zk-state (makeZKState conf auth-conf acls context)
+pacemaker-client (makeClient conf)]
+
+(reify
+  ClusterState
+  ;; Let these pass through to the zk-state. We only want to handle 
heartbeats.
+  (register [this callback] (.register zk-state callback))
+  (unregister [this callback] (.unregister zk-state callback))
+  (set_ephemeral_node [this path data acls] (.set_ephemeral_node 
zk-state path data acls))
+  (create_sequential [this path data acls] (.create_sequential 
zk-state path data acls))
+  (set_data [this path data acls] (.set_data zk-state path data acls))
+  (delete_node [this path] (.delete_node zk-state path))
+  (get_data [this path watch?] (.get_data zk-state path watch?))
+  (get_data_with_version [this path watch?] (.get_data_with_version 
zk-state path watch?))
+  (get_version [this path watch?] (.get_version zk-state path watch?))
+  (get_children [this path watch?] (.get_children zk-state path 
watch?))
+  (mkdirs [this path acls] (.mkdirs zk-state path acls))
+  (node_exists [this path watch?] (.node_exists zk-state path watch?))
+  (add_listener [this listener] (.add_listener zk-state listener))
+  (sync_path [this path] (.sync_path zk-state path))
+  
+  (set_worker_hb [this path data acls]
+(util/retry-on-exception
+ max-retries
+ "set_worker_hb"
+ #(let [response
+(.send pacemaker-client
+   (HBMessage. HBServerMessageType/SEND_PULSE
+   (HBMessageData/pulse
+(doto (HBPulse.)
+  (.set_id path)
+  (.set_details data)]
+(if (= (.get_type response) 
HBServerMessageType/SEND_PULSE_RESPONSE)
+  :ok
+  (throw (HBExecutionException. "Invalid Response Type"))
--- End diff --

It's part of the ClusterState interface spec.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

2015-11-23 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/838#discussion_r45642140
  
--- Diff: storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java ---
@@ -276,4 +313,26 @@ public static String get(Configuration configuration, 
String section, String key
 }
 return null;
 }
+
+private static final String USERNAME = "username";
+private static final String PASSWORD = "password";
+
+public static String makeDigestPayload(Configuration login_config, 
String config_section) {
--- End diff --

Fixed to generate SHA-512 digest of the username and password.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

2015-11-23 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/838#discussion_r45646121
  
--- Diff: 
storm-core/src/clj/org/apache/storm/pacemaker/pacemaker_state_factory.clj ---
@@ -0,0 +1,124 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.pacemaker.pacemaker-state-factory
+  (:require [org.apache.storm.pacemaker pacemaker]
+[backtype.storm.cluster-state [zookeeper-state-factory :as 
zk-factory]]
+[backtype.storm
+ [config :refer :all]
+ [cluster :refer :all]
+ [log :refer :all]
+ [util :as util]])
+  (:import [backtype.storm.generated
+HBExecutionException HBNodes HBRecords
+HBServerMessageType HBMessage HBMessageData HBPulse]
+   [backtype.storm.cluster_state zookeeper_state_factory]
+   [backtype.storm.cluster ClusterState]
+   [org.apache.storm.pacemaker PacemakerClient])
+  (:gen-class
+   :implements [backtype.storm.cluster.ClusterStateFactory]))
--- End diff --

Removed (for some reason isn't showing up here.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

2015-11-23 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/838#discussion_r45646200
  
--- Diff: 
storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslServerHandler.java
 ---
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KerberosSaslServerHandler extends 
SimpleChannelUpstreamHandler {
+
+ISaslServer server;
+/** Used for client or server's token to send or receive from each 
other. */
+private Map storm_conf;
+private String jaas_section;
+private List authorizedUsers;
+
+private static final Logger LOG = LoggerFactory
+.getLogger(KerberosSaslServerHandler.class);
+
+public KerberosSaslServerHandler(ISaslServer server, Map storm_conf, 
String jaas_section, List authorizedUsers) throws IOException {
+this.server = server;
+this.storm_conf = storm_conf;
+this.jaas_section = jaas_section;
+this.authorizedUsers = authorizedUsers;
+}
+
+@Override
+public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+throws Exception {
+Object msg = e.getMessage();
+if (msg == null)
+return;
+
+Channel channel = ctx.getChannel();
+
+
+if (msg instanceof SaslMessageToken) {
+// initialize server-side SASL functionality, if we haven't yet
+// (in which case we are looking at the first SASL message 
from the
+// client).
+try {
+LOG.debug("Got SaslMessageToken!");
+
+KerberosSaslNettyServer saslNettyServer = 
KerberosSaslNettyServerState.getKerberosSaslNettyServer
+.get(channel);
+if (saslNettyServer == null) {
+LOG.debug("No saslNettyServer for {}  yet; creating 
now, with topology token: ", channel);
+try {
+saslNettyServer = new 
KerberosSaslNettyServer(storm_conf, jaas_section, authorizedUsers);
+} catch (RuntimeException ioe) {
+LOG.error("Error occurred while creating 
saslNettyServer on server {} for client {}",
+  channel.getLocalAddress(), 
channel.getRemoteAddress());
+saslNettyServer = null;
+}
+
+
KerberosSaslNettyServerState.getKerberosSaslNettyServer.set(channel,
+   
 saslNettyServer);
+} else {
+LOG.debug("Found existing saslNettyServer on server: 
{} for client {}",
+  channel.getLocalAddress(), 
channel.getRemoteAddress());
+}
+
+byte[] responseBytes = 
saslNettyServer.response(((SaslMessageToken) msg)
+
.getSaslToken());
--- End diff --

Fixed above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

2015-11-23 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/838#discussion_r45646317
  
--- Diff: storm-core/src/clj/org/apache/storm/pacemaker/pacemaker.clj ---
@@ -0,0 +1,239 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.pacemaker.pacemaker
+  (:import [org.apache.storm.pacemaker PacemakerServer 
IServerMessageHandler]
+   [java.util.concurrent ConcurrentHashMap ThreadPoolExecutor 
TimeUnit LinkedBlockingDeque]
+   [java.util.concurrent.atomic AtomicInteger]
+   [java.util Date]
+   [backtype.storm.generated
+HBAuthorizationException HBExecutionException HBNodes HBRecords
+HBServerMessageType HBMessage HBMessageData HBPulse])
+  (:use [clojure.string :only [replace-first split]]
+[backtype.storm log config util])
+  (:require [clojure.java.jmx :as jmx])
+  (:gen-class))
--- End diff --

Removed (for some reason isn't showing up here.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1340: Use Travis-CI build matrix to impr...

2015-11-23 Thread knusbaum
GitHub user knusbaum opened a pull request:

https://github.com/apache/storm/pull/902

STORM-1340: Use Travis-CI build matrix to improve test execution times

This patch modifies the Travis-CI config to run storm-core unit tests in 
parallel with all the external tests, knocking between 4 and 7 minutes off the 
build time usually.

It also caches ruby, javascript, and java libraries instead of downloading 
them each time a container is launched.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/knusbaum/incubator-storm STORM-1340

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/902.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #902


commit a0e734727d2335bb0e7a12f8ada8ca07f4908ded
Author: Kyle Nusbaum 
Date:   2015-11-10T20:27:21Z

Adding setting to travis.

commit 146a893647267736b01c458dde9a1b0150194b67
Author: Kyle Nusbaum 
Date:   2015-11-10T22:01:14Z

Add caching to speed up build.

commit 56297183c86404fb4fe3d623e47808c07b961471
Author: Kyle Nusbaum 
Date:   2015-11-10T22:17:58Z

Kicking Travis

commit f0630adb057a4f3dc321e51d14610639753d07e7
Author: Kyle Nusbaum 
Date:   2015-11-10T22:40:19Z

Testing travis matrix.

commit 103f71a00a6c41a83349f39a420c2a2d0807ddb7
Author: Kyle Nusbaum 
Date:   2015-11-10T22:44:24Z

Testing

Testing

Testing

Testing

Testing

Testing

Testing

Testing

Fixing .travis.yml

commit e83678b8fba07f921b661ab0b2a40616726dec55
Author: Kyle Nusbaum 
Date:   2015-11-11T00:13:43Z

Cleanup

commit 77417554f1880e113177c874e45ed5721f94b06e
Author: Kyle Nusbaum 
Date:   2015-11-19T17:31:52Z

Testing split-up travis.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1340: Use Travis-CI build matrix to impr...

2015-11-23 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/902#issuecomment-159092312
  
Failure looks unrelated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1340: Use Travis-CI build matrix to impr...

2015-11-24 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/902#issuecomment-159314711
  
@HeartSaVioR Would you like to look at this? 
It's similar to the PR I put up before.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-24 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45790060
  
--- Diff: 
storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java ---
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.TreeSet;
+import java.util.Map;
+import java.util.List;
+
+/**
+ * Class hands over the key sequence number which implies the number of 
updates made to a blob.
+ * The information regarding the keys and the sequence number which 
represents the number of updates are
+ * stored within the zookeeper in the following format.
+ * /storm/blobstore/key_name/nimbushostport-sequencenumber
+ * Example:
+ * If there are two nimbodes with nimbus.seeds:leader,non-leader are set,
+ * then the state inside the zookeeper is eventually stored as:
+ * /storm/blobstore/key1/leader:8080-1
+ * /storm/blobstore/key1/non-leader:8080-1
+ * indicates that a new blob with the name key1 has been created on the 
leader
+ * nimbus and the non-leader nimbus syncs after a call back is triggered 
by attempting
+ * to download the blob and finally updates its state inside the zookeeper.
+ *
+ * A watch is placed on the /storm/blobstore/key1 and the znodes 
leader:8080-1 and
+ * non-leader:8080-1 are ephemeral which implies that these nodes exist 
only until the
+ * connection between the corresponding nimbus and the zookeeper persist. 
If in case the
+ * nimbus crashes the node disappears under /storm/blobstore/key1.
+ *
+ * The sequence number for the keys are handed over based on the following 
scenario:
+ * Lets assume there are three nimbodes up and running, one being the 
leader and the other
+ * being the non-leader.
+ *
+ * 1. Create is straight forward.
+ * Check whether the znode -> /storm/blobstore/key1 has been created or 
not. It implies
+ * the blob has not been created yet. If not created, it creates it and 
updates the zookeeper
+ * states under /storm/blobstore/key1 and 
/storm/blobstoremaxkeysequencenumber/key1.
+ * The znodes it creates on these nodes are 
/storm/blobstore/key1/leader:8080-1,
+ * /storm/blobstore/key1/non-leader:8080-1 and 
/storm/blobstoremaxkeysequencenumber/key1/1.
+ * The later holds the global sequence number across all nimbodes more 
like a static variable
+ * indicating the true value of number of updates for a blob. This node 
helps to maintain sanity in case
+ * leadership changes due to crashing.
+ *
+ * 2. Delete does not require to hand over the sequence number.
+ *
+ * 3. Finally, the update has few scenarios.
+ *
+ *  The class implements a TreeSet. The basic idea is if all the nimbodes 
have the same
+ *  sequence number for the blob, then the number of elements in the set 
is 1 which holds
+ *  the latest value of sequence number. If the number of elements are 
greater than 1 then it
+ *  implies that there is sequence mismatch and there is need for syncing 
the blobs across
+ *  nimbodes.
+ *
+ *  The logic for handing over sequence numbers based on the state are 
described as follows
+ *  Here consider Nimbus-1 alias as N1 and Nimbus-2 alias as N2.
+ *  Scenario 1:
+ *  Example: Normal create/update scenario
+ *  Operation Nimbus-1:state Nimbus-2:state Seq-Num-Nimbus-1  
Seq-Num-Nimbus-2  Max-Seq-Num
+ *  Create-Key1   alive - Leader alive  1  
 1
+ *  Sync  alive - Leader alive  1 
1 (callback -> download)  1
+ *  Up

[GitHub] storm pull request: STORM-1341 Let topology have own heartbeat tim...

2015-11-25 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/903#issuecomment-159698422
  
+1
Looks fine to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-30 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/845#issuecomment-160727100
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1364] logs the storm version when start...

2015-12-03 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/916#issuecomment-161800044
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1377] Adds lock when launching a test c...

2015-12-10 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/940#issuecomment-163765859
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1383] Avoid supervisor crashing if nimb...

2015-12-10 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/938#issuecomment-163768096
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1385] stats.clj divide by zero exceptio...

2015-12-10 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/937#issuecomment-163768792
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1359] change kryo links from google cod...

2015-12-10 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/908#issuecomment-163838439
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1359] change kryo links from google cod...

2015-12-10 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/909#issuecomment-163838627
  
@rfarivar @darionyaphet #908 is a pull to a different branch.

+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-898] - Add priorities and per user reso...

2015-12-15 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/921#discussion_r47664671
  
--- Diff: storm-core/src/clj/backtype/storm/daemon/nimbus.clj ---
@@ -540,12 +540,16 @@
 topology (read-storm-topology-as-nimbus storm-id blob-store)
 executor->component (->> (compute-executor->component nimbus 
storm-id)
  (map-key (fn [[start-task end-task]]
-(ExecutorDetails. (int 
start-task) (int end-task)]
+(ExecutorDetails. (int 
start-task) (int end-task)
+launch-time-secs (if storm-base (:launch-time-secs storm-base)
--- End diff --

'else' should be on its own line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-898] - Add priorities and per user reso...

2015-12-15 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/921#discussion_r47688725
  
--- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java 
---
@@ -403,13 +415,23 @@ public void addResourcesForExec(ExecutorDetails exec, 
Map resour
  * Add default resource requirements for a executor
  */
 public void addDefaultResforExec(ExecutorDetails exec) {
+Double topologyComponentCpuPcorePercent = 
Utils.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT), 
null);
+if (topologyComponentCpuPcorePercent == null) {
+LOG.warn("default value for 
topology.component.cpu.pcore.percent needs to be set!");
--- End diff --

What happens if it isn't set?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-898] - Add priorities and per user reso...

2015-12-15 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/921#discussion_r47692234
  
--- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java 
---
@@ -403,13 +415,23 @@ public void addResourcesForExec(ExecutorDetails exec, 
Map resour
  * Add default resource requirements for a executor
  */
 public void addDefaultResforExec(ExecutorDetails exec) {
+Double topologyComponentCpuPcorePercent = 
Utils.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT), 
null);
+if (topologyComponentCpuPcorePercent == null) {
+LOG.warn("default value for 
topology.component.cpu.pcore.percent needs to be set!");
--- End diff --

Why don't we throw something that's not an NPE here, then? I'd rather fail 
earlier than later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-898] - Add priorities and per user reso...

2015-12-15 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/921#discussion_r47692998
  
--- Diff: conf/user-resource-pools-example.yaml ---
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+resource.aware.scheduler.user.pools:
+jerry:
+cpu: 1000
+memory: 8192.0
+derek:
+cpu: 1.0
+memory: 32768
--- End diff --

Does this need to be explicitly floating? Or is this intended to 
demonstrate that integers are okay?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-898] - Add priorities and per user reso...

2015-12-15 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/921#discussion_r47696530
  
--- Diff: storm-core/src/jvm/backtype/storm/scheduler/Cluster.java ---
@@ -430,12 +457,44 @@ public SupervisorDetails getSupervisorById(String 
nodeId) {
 }
 
 /**
+ * set assignments for cluster
+ */
+public void setAssignments(Map 
newAssignments) {
+this.assignments = new HashMap();
+for (Map.Entry entry : 
newAssignments.entrySet()) {
+this.assignments.put(entry.getKey(), new 
SchedulerAssignmentImpl(entry.getValue().getTopologyId(), 
entry.getValue().getExecutorToSlot()));
--- End diff --

Why not add a copyOf method to SchedulerAssignmentImpl? 
`SchedulerAssignmentImpl.copyOf(entry.getValue())`

That should sort of clarify any confusion that you're just making 
`assignments` into a copy of `newAssignments`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-898] - Add priorities and per user reso...

2015-12-15 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/921#discussion_r47697722
  
--- Diff: 
storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java ---
@@ -193,17 +199,21 @@ public void freeAllSlots(Cluster cluster) {
 /**
  * Frees a single slot in this node
  * @param ws the slot to free
- * @param cluster the cluster to update
  */
-public void free(WorkerSlot ws, Cluster cluster) {
+public void free(WorkerSlot ws) {
+LOG.info("freeing ws {} on node {}", ws, _hostname);
--- End diff --

Can we call it 'slot' or something instead of 'ws' though. Until I read the 
type of the argument, I didn't know what ws was. Seeing that in the logs might 
be confusing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-898] - Add priorities and per user reso...

2015-12-15 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/921#discussion_r47697928
  
--- Diff: 
storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java ---
@@ -230,6 +241,61 @@ public void freeTopology(String topId, Cluster 
cluster) {
 _topIdToUsedSlots.remove(topId);
 }
 
+private void freeMemory(double amount) {
+_availMemory += amount;
+LOG.debug("freeing {} memory on node {}...avail mem: {}", amount, 
this.getHostname(), _availMemory);
+if (_availMemory > this.getTotalMemoryResources()) {
+LOG.warn("Freeing more memory than there exists!");
+}
+}
+
+private void freeCPU(double amount) {
+_availCPU += amount;
+LOG.debug("freeing {} CPU on node...avail CPU: {}", amount, 
this.getHostname(), _availCPU);
+if (_availCPU > this.getAvailableCpuResources()) {
+LOG.warn("Freeing more memory than there exists!");
--- End diff --

"more memory" -> "more CPU"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-898] - Add priorities and per user reso...

2015-12-15 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/921#discussion_r47698225
  
--- Diff: 
storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java ---
@@ -193,17 +199,21 @@ public void freeAllSlots(Cluster cluster) {
 /**
  * Frees a single slot in this node
  * @param ws the slot to free
- * @param cluster the cluster to update
  */
-public void free(WorkerSlot ws, Cluster cluster) {
+public void free(WorkerSlot ws) {
+LOG.info("freeing ws {} on node {}", ws, _hostname);
 if (_freeSlots.contains(ws)) return;
 for (Entry> entry : 
_topIdToUsedSlots.entrySet()) {
 Set slots = entry.getValue();
+double memUsed = this.getMemoryUsedByWorker(ws);
--- End diff --

+1 for this along with all the other method calls in this file. Your 
argument for using `this` for member variables I can understand (even if I 
disagree), but using `this` for method calls doesn't convey anything and is 
just clutter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-898] - Add priorities and per user reso...

2015-12-15 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/921#discussion_r47698573
  
--- Diff: 
storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java ---
@@ -72,10 +71,9 @@ public RAS_Node(String nodeId, Set allPorts, 
boolean isAlive,
 _availMemory = this.getTotalMemoryResources();
 _availCPU = this.getTotalCpuResources();
 _slots.addAll(_freeSlots);
-for (WorkerSlot ws : _slots) {
-_slotToExecs.put(ws, new ArrayList());
-}
 }
+this._cluster = cluster;
--- End diff --

Can we also not use `this` for member variables that have been tagged as 
such by naming convention? That's exactly what the underscore is intended to 
communicate. There's no benefit to using both. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-898] - Add priorities and per user reso...

2015-12-15 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/921#discussion_r47710177
  
--- Diff: conf/user-resource-pools-example.yaml ---
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+resource.aware.scheduler.user.pools:
+jerry:
+cpu: 1000
+memory: 8192.0
+derek:
+cpu: 1.0
+memory: 32768
--- End diff --

+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1393] Update the storm.log.dir function...

2015-12-16 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/949#discussion_r47804494
  
--- Diff: storm-core/src/clj/backtype/storm/daemon/supervisor.clj ---
@@ -1022,7 +1022,7 @@
   storm-home (System/getProperty "storm.home")
   storm-options (System/getProperty "storm.options")
   storm-conf-file (System/getProperty "storm.conf.file")
-  storm-log-dir (or (System/getProperty "storm.log.dir") (str 
storm-home file-path-separator "logs"))
+  storm-log-dir backtype.storm.config/LOG-DIR
--- End diff --

Here as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1393] Update the storm.log.dir function...

2015-12-16 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/949#discussion_r47804454
  
--- Diff: storm-core/src/clj/backtype/storm/config.clj ---
@@ -258,8 +264,8 @@
  (if workers-artifacts-dir
(if (is-absolute-path? workers-artifacts-dir)
  workers-artifacts-dir
- (str backtype.storm.util/LOG-DIR file-path-separator 
workers-artifacts-dir))
-   (str backtype.storm.util/LOG-DIR file-path-separator 
"workers-artifacts"
+ (str backtype.storm.config/LOG-DIR file-path-separator 
workers-artifacts-dir))
+   (str backtype.storm.config/LOG-DIR file-path-separator 
"workers-artifacts"
--- End diff --

We can just remove the `backtype.storm.config/` from these. They don't need 
an explicit namespace because they're in the same namespace.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1393] Update the storm.log.dir function...

2015-12-16 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/949#issuecomment-165180857
  
Minor nits. Everything else OK.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1388] Fix url and email links in README...

2015-12-16 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/943#issuecomment-165205623
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-898] - Add priorities and per user reso...

2015-12-16 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/921#discussion_r47833630
  
--- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java 
---
@@ -396,34 +410,50 @@ public void addResourcesForExec(ExecutorDetails exec, 
Map resour
 LOG.warn("Executor {} already exists...ResourceList: {}", 
exec, getTaskResourceReqList(exec));
 return;
 }
-_resourceList.put(exec, resourceList);
+this.resourceList.put(exec, resourceList);
 }
 
 /**
  * Add default resource requirements for a executor
  */
 public void addDefaultResforExec(ExecutorDetails exec) {
+Double topologyComponentCpuPcorePercent = 
Utils.getDouble(this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT),
 null);
+if (topologyComponentCpuPcorePercent == null) {
+LOG.warn("default value for " + 
Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT + " needs to be set!");
+}
--- End diff --

This will **only** occur if a developer has deleted an existing value in 
defaults.yaml - that is, they are working on Storm itself, not building a 
topology. The topology config validation will take care of that - There is no 
way a user can submit a topology that will cause this on a properly compiled 
Storm cluster. 

This is determined at compile-time, and will be caught by unit tests. 
Having it caught in unit tests is preferable since it will raise an alert on 
any pull requests that break it, whereas having it here won't raise an alert 
until a cluster is up and someone tries to launch something.

I can still see the usefulness of these checks IF we throw 
RuntimeExceptions here for two main reasons:

1. It might blow up later, and when I look at the logs and see a stack 
trace, I don't go back too far to look for other stuff. So if we don't throw 
here, the users are going to be dealing with the same NPE in a weird spot you 
were trying to avoid.

2. It might *not* blow up later, and instead just exhibit some weird 
behavior. This will be worse because there won't be a stack trace for the user 
to find, and they won't find any errors in the logs. (These are `LOG.warn()` 
calls)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1397: Merge conflict from Pacemaker merg...

2015-12-16 Thread knusbaum
GitHub user knusbaum opened a pull request:

https://github.com/apache/storm/pull/951

STORM-1397: Merge conflict from Pacemaker merge



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/knusbaum/incubator-storm STORM-1397

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/951.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #951


commit 96939d49ff83c99b200c47dd5e077e270a36909a
Author: Kyle Nusbaum 
Date:   2015-12-16T22:46:33Z

Adding context




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1397: Merge conflict from Pacemaker merg...

2015-12-17 Thread knusbaum
Github user knusbaum closed the pull request at:

https://github.com/apache/storm/pull/951


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1397: Merge conflict from Pacemaker merg...

2015-12-17 Thread knusbaum
GitHub user knusbaum reopened a pull request:

https://github.com/apache/storm/pull/951

STORM-1397: Merge conflict from Pacemaker merge

When Pacemaker was merged, some calls got missed in nimbus.clj, 
supervisor.clj, worker.clj and executor.clj.

This merge error prevents nimbus from spawning separate read/write 
zookeeper clients since the ClusterState ends up with daemonType == 
DaemonType.UNKNOWN.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/knusbaum/incubator-storm STORM-1397

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/951.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #951


commit 96939d49ff83c99b200c47dd5e077e270a36909a
Author: Kyle Nusbaum 
Date:   2015-12-16T22:46:33Z

Adding context

commit 6c95887945bd7fe845b9d399b5969d52cf7295da
Author: Kyle Nusbaum 
Date:   2015-12-16T23:00:51Z

Fixing missing includes.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Update storm download link in storm.py

2015-12-17 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/953#issuecomment-165568183
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1400] adds null check before trying to ...

2015-12-17 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/956#issuecomment-165571017
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1399: Blobstore tests should write data ...

2015-12-17 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/955#discussion_r47956102
  
--- Diff: 
external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
 ---
@@ -76,6 +76,7 @@
 
   @Before
   public void init() {
+System.setProperty("test.build.data", "target/test/data");
 initializeConfigs();
 baseFile = new File("/tmp/blob-store-test-"+UUID.randomUUID());
--- End diff --

Why are we not also changing here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1398: Add back in TopologyDetails.getTop...

2015-12-17 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/954#issuecomment-165572783
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [YSTORM-1396] Added backward compatibility met...

2015-12-17 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/952#discussion_r47956485
  
--- Diff: storm-core/src/clj/backtype/storm/daemon/nimbus.clj ---
@@ -1716,15 +1716,13 @@
   (.remove uploaders location)
   ))
 
-  (^String beginFileDownload [this ^String file]
-(mark! nimbus:num-beginFileDownload-calls)
+  (^String beginFileDownload
+[this ^String file]
 (check-authorization! nimbus nil nil "fileDownload")
-(check-file-access (:conf nimbus) file)
-(let [is (BufferFileInputStream. file)
+(let [is (BufferInputStream. (.getBlob (:blob-store nimbus) file 
nil) ^Integer (Utils/getInt (conf 
STORM-BLOBSTORE-INPUTSTREAM-BUFFER-SIZE-BYTES) (int 65536)))
--- End diff --

Can we break this line up? Very hard to read.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1393] Update the storm.log.dir function...

2015-12-17 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/949#issuecomment-165576147
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1372] Merging design and usage document...

2015-12-17 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/945#issuecomment-165576433
  
+1



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1383] Avoid supervisor crashing if nimb...

2015-12-17 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/938#issuecomment-165579313
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1395: move JUnit dependency to top-level...

2015-12-17 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/950#issuecomment-165581256
  
24-hours is up. I'm going to merge this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1376] Performance slowdown due excessiv...

2015-12-17 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/933#issuecomment-165582546
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1381:Adding client side submission hook ...

2015-12-17 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/935#issuecomment-165592565
  
+1 Unit tests pass locally.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: storm-starter: Guide JDK version to later than...

2016-01-05 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/988#issuecomment-169064420
  
Do we want to mention that 8 is equally supported? Maybe say 'JDK 7 and 
above' or something?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1429: LocalizerTest fix

2016-01-05 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/987#issuecomment-169070441
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1432: Spurious failure in storm-kafka te...

2016-01-05 Thread knusbaum
GitHub user knusbaum opened a pull request:

https://github.com/apache/storm/pull/990

STORM-1432: Spurious failure in storm-kafka test

ExponentialBackoffMsgRetryManagerTest.testMaxBackoff() fails sometimes, I 
think due to too small times. It can miss the mark on slow machines.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/knusbaum/incubator-storm STORM-1432

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/990.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #990


commit 6712e8d7e4cf48e045eb0d46187291296d99a1f2
Author: Kyle Nusbaum 
Date:   2016-01-05T18:22:23Z

Fixing STORM-1432




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1432: Spurious failure in storm-kafka te...

2016-01-05 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/990#issuecomment-169098388
  
@revans2 I'll keep testing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1432: Spurious failure in storm-kafka te...

2016-01-05 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/990#issuecomment-169098913
  
That looks like a different test entirely that has the same problem. I'll 
try to fix that one as well.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: storm-starter: Guide JDK version to later than...

2016-01-05 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/988#issuecomment-169156535
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Update BYLAWS.md for clarification of code-cha...

2016-01-12 Thread knusbaum
Github user knusbaum closed the pull request at:

https://github.com/apache/storm/pull/511


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1467: Switch apache-rat plugin off by de...

2016-01-12 Thread knusbaum
GitHub user knusbaum opened a pull request:

https://github.com/apache/storm/pull/1006

STORM-1467: Switch apache-rat plugin off by default, but enable for 
Travis-CI

I added an un-licensed file in to show that the CI will fail. I'll remove 
it before merge.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/knusbaum/incubator-storm STORM-1467

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1006.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1006


commit 4c2b17c0a4fef0d91a3bc661f8af748ceb527d2b
Author: Kyle Nusbaum 
Date:   2016-01-12T17:58:07Z

Testing.

commit a723485272ff689abd4305d17bc46fe32a6490e0
Author: Kyle Nusbaum 
Date:   2016-01-12T18:00:28Z

Testing.

commit d43b64cf7aaeb37867f425d973170e4fa8c92244
Author: Kyle Nusbaum 
Date:   2016-01-12T18:17:18Z

Adding dumb file for the rat to find.

commit e34db41cc93be24dee41bf2e66937d4031ab4ccc
Author: Kyle Nusbaum 
Date:   2016-01-12T20:29:32Z

Testing.

commit 99ea06fb1e9b1b2114d860198368c31775b4344e
Author: Kyle Nusbaum 
Date:   2016-01-12T20:40:57Z

Fix & Test.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1466: Move the org.apache.thrift7 namesp...

2016-01-12 Thread knusbaum
GitHub user knusbaum opened a pull request:

https://github.com/apache/storm/pull/1007

STORM-1466: Move the org.apache.thrift7 namespace to something 
correct/sensible



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/knusbaum/incubator-storm STORM-1466

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1007.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1007


commit ab11ce97786c2eef523c04684b86b67b7902dca8
Author: Kyle Nusbaum 
Date:   2016-01-12T22:58:17Z

Move the org.apache.thrift7 namespace to something correct/sensible




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1466: Move the org.apache.thrift7 namesp...

2016-01-13 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/1007#issuecomment-171368349
  
I'm also going to cherry-pick this back into the 1.0 branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1227] port org.apache.storm.config to j...

2016-01-21 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/1030#discussion_r50453900
  
--- Diff: storm-core/src/clj/org/apache/storm/config.clj ---
@@ -41,291 +38,18 @@
   (dofor [f (seq (.getFields Config))]
  (.get f nil)))
 
-
+;; TODO this function and its callings will be replace when nimbus and 
supervisor move to Java
 (defn cluster-mode
   [conf & args]
   (keyword (conf STORM-CLUSTER-MODE)))
 
-(defn local-mode?
-  [conf]
-  (let [mode (conf STORM-CLUSTER-MODE)]
-(condp = mode
-  "local" true
-  "distributed" false
-  (throw (IllegalArgumentException.
-   (str "Illegal cluster mode in conf: " mode))
-
 (defn sampling-rate
   [conf]
   (->> (conf TOPOLOGY-STATS-SAMPLE-RATE)
-   (/ 1)
-   int))
+(/ 1)
+int))
 
+;; TODO this function together with sampling-rate are to be replaced with 
Java version when util.clj is in
 (defn mk-stats-sampler
   [conf]
-  (even-sampler (sampling-rate conf)))
-
-(defn read-default-config
-  []
-  (clojurify-structure (Utils/readDefaultConfig)))
-
-(defn validate-configs-with-schemas
-  [conf]
-  (ConfigValidation/validateFields conf))
-
-(defn read-storm-config
-  []
-  (let [conf (clojurify-structure (Utils/readStormConfig))]
-(validate-configs-with-schemas conf)
-conf))
-
-(defn read-yaml-config
-  ([name must-exist]
- (let [conf (clojurify-structure (Utils/findAndReadConfigFile name 
must-exist))]
-   (validate-configs-with-schemas conf)
-   conf))
-  ([name]
- (read-yaml-config true)))
-
-(defn absolute-storm-local-dir [conf]
-  (let [storm-home (System/getProperty "storm.home")
-path (conf STORM-LOCAL-DIR)]
-(if path
-  (if (is-absolute-path? path) path (str storm-home 
file-path-separator path))
-  (str storm-home file-path-separator "storm-local"
-
-(def LOG-DIR
-  (.getCanonicalPath
-(clojure.java.io/file (or (System/getProperty "storm.log.dir")
-  (get (read-storm-config) "storm.log.dir")
-  (str (System/getProperty "storm.home") 
file-path-separator "logs")
-
-(defn absolute-healthcheck-dir [conf]
-  (let [storm-home (System/getProperty "storm.home")
-path (conf STORM-HEALTH-CHECK-DIR)]
-(if path
-  (if (is-absolute-path? path) path (str storm-home 
file-path-separator path))
-  (str storm-home file-path-separator "healthchecks"
-
-(defn master-local-dir
-  [conf]
-  (let [ret (str (absolute-storm-local-dir conf) file-path-separator 
"nimbus")]
-(FileUtils/forceMkdir (File. ret))
-ret))
-
-(defn master-stormjar-key
-  [topology-id]
-  (str topology-id "-stormjar.jar"))
-
-(defn master-stormcode-key
-  [topology-id]
-  (str topology-id "-stormcode.ser"))
-
-(defn master-stormconf-key
-  [topology-id]
-  (str topology-id "-stormconf.ser"))
-
-(defn master-stormdist-root
-  ([conf]
-   (str (master-local-dir conf) file-path-separator "stormdist"))
-  ([conf storm-id]
-   (str (master-stormdist-root conf) file-path-separator storm-id)))
-
-(defn master-tmp-dir
-  [conf]
-  (let [ret (str (master-local-dir conf) file-path-separator "tmp")]
-(FileUtils/forceMkdir (File. ret))
-ret ))
-
-(defn read-supervisor-storm-conf-given-path
-  [conf stormconf-path]
-  (merge conf (clojurify-structure (Utils/fromCompressedJsonConf 
(FileUtils/readFileToByteArray (File. stormconf-path))
-
-(defn master-storm-metafile-path [stormroot ]
-  (str stormroot file-path-separator "storm-code-distributor.meta"))
-
-(defn master-stormjar-path
-  [stormroot]
-  (str stormroot file-path-separator "stormjar.jar"))
-
-(defn master-stormcode-path
-  [stormroot]
-  (str stormroot file-path-separator "stormcode.ser"))
-
-(defn master-stormconf-path
-  [stormroot]
-  (str stormroot file-path-separator "stormconf.ser"))
-
-(defn master-inbox
-  [conf]
-  (let [ret (str (master-local-dir conf) file-path-separator "inbox")]
-(FileUtils/forceMkdir (File. ret))
-ret ))
-
-(defn master-inimbus-dir
-  [conf]
-  (str (master-local-dir conf) file-path-separator "inimbus"))
-
-(defn supervisor-local-dir
-  [conf]
-  (let [ret (str (absolute-storm-local-dir conf) file-

[GitHub] storm pull request: [STORM-1227] port org.apache.storm.config to j...

2016-01-21 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/1030#discussion_r50455607
  
--- Diff: storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java ---
@@ -0,0 +1,678 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.utils;
+
+import org.apache.storm.Config;
+import org.apache.storm.validation.ConfigValidation;
+import org.apache.storm.generated.StormTopology;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Collections;
+import java.net.URLEncoder;
+
+public class ConfigUtils {
+private final static Logger LOG = 
LoggerFactory.getLogger(ConfigUtils.class);
+public final static String RESOURCES_SUBDIR = "resources";
+public final static String NIMBUS_DO_NOT_REASSIGN = 
"NIMBUS-DO-NOT-REASSIGN";
+public static final String FILE_SEPARATOR = File.separator;
+
+public static String getLogDir() {
+String dir;
+Map conf;
+if (System.getProperty("storm.log.dir") != null) {
+dir = System.getProperty("storm.log.dir");
--- End diff --

Can we avoid calling System.getProperty twice by assigning at declaration 
of dir?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1227] port org.apache.storm.config to j...

2016-01-21 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/1030#discussion_r50455998
  
--- Diff: storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java ---
@@ -0,0 +1,678 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.utils;
+
+import org.apache.storm.Config;
+import org.apache.storm.validation.ConfigValidation;
+import org.apache.storm.generated.StormTopology;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Collections;
+import java.net.URLEncoder;
+
+public class ConfigUtils {
+private final static Logger LOG = 
LoggerFactory.getLogger(ConfigUtils.class);
+public final static String RESOURCES_SUBDIR = "resources";
+public final static String NIMBUS_DO_NOT_REASSIGN = 
"NIMBUS-DO-NOT-REASSIGN";
+public static final String FILE_SEPARATOR = File.separator;
+
+public static String getLogDir() {
+String dir;
+Map conf;
+if (System.getProperty("storm.log.dir") != null) {
+dir = System.getProperty("storm.log.dir");
+} else if ((conf = readStormConfig()).get("storm.log.dir") != 
null) {
+dir = String.valueOf(conf.get("storm.log.dir"));
+} else {
+if (System.getProperty("storm.home") != null) {
--- End diff --

No need for nested if here. Make an else-if and an else.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1227] port org.apache.storm.config to j...

2016-01-21 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/1030#discussion_r50457931
  
--- Diff: storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java ---
@@ -0,0 +1,678 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.utils;
+
+import org.apache.storm.Config;
+import org.apache.storm.validation.ConfigValidation;
+import org.apache.storm.generated.StormTopology;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Collections;
+import java.net.URLEncoder;
+
+public class ConfigUtils {
+private final static Logger LOG = 
LoggerFactory.getLogger(ConfigUtils.class);
+public final static String RESOURCES_SUBDIR = "resources";
+public final static String NIMBUS_DO_NOT_REASSIGN = 
"NIMBUS-DO-NOT-REASSIGN";
+public static final String FILE_SEPARATOR = File.separator;
+
+public static String getLogDir() {
+String dir;
+Map conf;
+if (System.getProperty("storm.log.dir") != null) {
+dir = System.getProperty("storm.log.dir");
+} else if ((conf = readStormConfig()).get("storm.log.dir") != 
null) {
+dir = String.valueOf(conf.get("storm.log.dir"));
+} else {
+if (System.getProperty("storm.home") != null) {
+dir = System.getProperty("storm.home") + FILE_SEPARATOR + 
"logs";
+} else {
+dir = FILE_SEPARATOR + "logs";
+}
+}
+try {
+return new File(dir).getCanonicalPath();
+} catch (IOException ex) {
+throw new IllegalArgumentException("Illegal storm.log.dir in 
conf: " + dir);
+}
+}
+
+public static String clojureConfigName(String name) {
+return name.toUpperCase().replace("_", "-");
+}
+
+// ALL-CONFIGS is only used by executor.clj once, do we want to do it 
here? TODO
+public static List All_CONFIGS() {
+List ret = new ArrayList();
+Config config = new Config();
+Class ConfigClass = config.getClass();
+Field[] fields = ConfigClass.getFields();
+for (int i = 0; i < fields.length; i++) {
+try {
+Object obj = fields[i].get(null);
+ret.add(obj);
+} catch (IllegalArgumentException e) {
+LOG.error(e.getMessage(), e);
+} catch (IllegalAccessException e) {
+LOG.error(e.getMessage(), e);
+}
+}
+return ret;
+}
+
+public static String clusterMode(Map conf) {
+String mode = (String)conf.get(Config.STORM_CLUSTER_MODE);
+return mode;
+}
+
+public static boolean isLocalMode(Map conf) {
+String mode = (String)conf.get(Config.STORM_CLUSTER_MODE);
+if (mode != null) {
+if ("local".equals(mode)) {
+return true;
+}
+if ("distributed".equals(mode)) {
+return false;
+}
+}
+throw new IllegalArgumentException("Illegal cluster mode in conf: 
" + mode);
+}
+
+public static int samplingRate(Map conf) {
+double rate = 
Utils.getDouble(conf.get(Config.TOPOLOGY_STATS_SAMPLE_RATE));
+if (rate != 0) {
+return (int) (1 / rate);
+}
+  

[GitHub] storm pull request: [STORM-1227] port org.apache.storm.config to j...

2016-01-21 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/1030#discussion_r50459185
  
--- Diff: storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java ---
@@ -0,0 +1,678 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.utils;
+
+import org.apache.storm.Config;
+import org.apache.storm.validation.ConfigValidation;
+import org.apache.storm.generated.StormTopology;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Collections;
+import java.net.URLEncoder;
+
+public class ConfigUtils {
+private final static Logger LOG = 
LoggerFactory.getLogger(ConfigUtils.class);
+public final static String RESOURCES_SUBDIR = "resources";
+public final static String NIMBUS_DO_NOT_REASSIGN = 
"NIMBUS-DO-NOT-REASSIGN";
+public static final String FILE_SEPARATOR = File.separator;
+
+public static String getLogDir() {
+String dir;
+Map conf;
+if (System.getProperty("storm.log.dir") != null) {
+dir = System.getProperty("storm.log.dir");
+} else if ((conf = readStormConfig()).get("storm.log.dir") != 
null) {
+dir = String.valueOf(conf.get("storm.log.dir"));
+} else {
+if (System.getProperty("storm.home") != null) {
+dir = System.getProperty("storm.home") + FILE_SEPARATOR + 
"logs";
+} else {
+dir = FILE_SEPARATOR + "logs";
+}
+}
+try {
+return new File(dir).getCanonicalPath();
+} catch (IOException ex) {
+throw new IllegalArgumentException("Illegal storm.log.dir in 
conf: " + dir);
+}
+}
+
+public static String clojureConfigName(String name) {
+return name.toUpperCase().replace("_", "-");
+}
+
+// ALL-CONFIGS is only used by executor.clj once, do we want to do it 
here? TODO
+public static List All_CONFIGS() {
+List ret = new ArrayList();
+Config config = new Config();
+Class ConfigClass = config.getClass();
+Field[] fields = ConfigClass.getFields();
+for (int i = 0; i < fields.length; i++) {
+try {
+Object obj = fields[i].get(null);
+ret.add(obj);
+} catch (IllegalArgumentException e) {
+LOG.error(e.getMessage(), e);
+} catch (IllegalAccessException e) {
+LOG.error(e.getMessage(), e);
+}
+}
+return ret;
+}
+
+public static String clusterMode(Map conf) {
+String mode = (String)conf.get(Config.STORM_CLUSTER_MODE);
+return mode;
+}
+
+public static boolean isLocalMode(Map conf) {
+String mode = (String)conf.get(Config.STORM_CLUSTER_MODE);
+if (mode != null) {
+if ("local".equals(mode)) {
+return true;
+}
+if ("distributed".equals(mode)) {
+return false;
+}
+}
+throw new IllegalArgumentException("Illegal cluster mode in conf: 
" + mode);
+}
+
+public static int samplingRate(Map conf) {
+double rate = 
Utils.getDouble(conf.get(Config.TOPOLOGY_STATS_SAMPLE_RATE));
+if (rate != 0) {
+return (int) (1 / rate);
+}
+  

[GitHub] storm pull request: [STORM-1227] port org.apache.storm.config to j...

2016-01-21 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/1030#discussion_r50460728
  
--- Diff: storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java ---
@@ -0,0 +1,678 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.utils;
+
+import org.apache.storm.Config;
+import org.apache.storm.validation.ConfigValidation;
+import org.apache.storm.generated.StormTopology;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Collections;
+import java.net.URLEncoder;
+
+public class ConfigUtils {
+private final static Logger LOG = 
LoggerFactory.getLogger(ConfigUtils.class);
+public final static String RESOURCES_SUBDIR = "resources";
+public final static String NIMBUS_DO_NOT_REASSIGN = 
"NIMBUS-DO-NOT-REASSIGN";
+public static final String FILE_SEPARATOR = File.separator;
+
+public static String getLogDir() {
+String dir;
+Map conf;
+if (System.getProperty("storm.log.dir") != null) {
+dir = System.getProperty("storm.log.dir");
+} else if ((conf = readStormConfig()).get("storm.log.dir") != 
null) {
+dir = String.valueOf(conf.get("storm.log.dir"));
+} else {
+if (System.getProperty("storm.home") != null) {
+dir = System.getProperty("storm.home") + FILE_SEPARATOR + 
"logs";
+} else {
+dir = FILE_SEPARATOR + "logs";
+}
+}
+try {
+return new File(dir).getCanonicalPath();
+} catch (IOException ex) {
+throw new IllegalArgumentException("Illegal storm.log.dir in 
conf: " + dir);
+}
+}
+
+public static String clojureConfigName(String name) {
+return name.toUpperCase().replace("_", "-");
+}
+
+// ALL-CONFIGS is only used by executor.clj once, do we want to do it 
here? TODO
+public static List All_CONFIGS() {
+List ret = new ArrayList();
+Config config = new Config();
+Class ConfigClass = config.getClass();
+Field[] fields = ConfigClass.getFields();
+for (int i = 0; i < fields.length; i++) {
+try {
+Object obj = fields[i].get(null);
+ret.add(obj);
+} catch (IllegalArgumentException e) {
+LOG.error(e.getMessage(), e);
+} catch (IllegalAccessException e) {
+LOG.error(e.getMessage(), e);
+}
+}
+return ret;
+}
+
+public static String clusterMode(Map conf) {
+String mode = (String)conf.get(Config.STORM_CLUSTER_MODE);
+return mode;
+}
+
+public static boolean isLocalMode(Map conf) {
+String mode = (String)conf.get(Config.STORM_CLUSTER_MODE);
+if (mode != null) {
+if ("local".equals(mode)) {
+return true;
+}
+if ("distributed".equals(mode)) {
+return false;
+}
+}
+throw new IllegalArgumentException("Illegal cluster mode in conf: 
" + mode);
+}
+
+public static int samplingRate(Map conf) {
+double rate = 
Utils.getDouble(conf.get(Config.TOPOLOGY_STATS_SAMPLE_RATE));
+if (rate != 0) {
+return (int) (1 / rate);
+}
+  

[GitHub] storm pull request: [STORM-1227] port org.apache.storm.config to j...

2016-01-21 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/1030#discussion_r50461090
  
--- Diff: storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java ---
@@ -0,0 +1,678 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.utils;
+
+import org.apache.storm.Config;
+import org.apache.storm.validation.ConfigValidation;
+import org.apache.storm.generated.StormTopology;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Collections;
+import java.net.URLEncoder;
+
+public class ConfigUtils {
+private final static Logger LOG = 
LoggerFactory.getLogger(ConfigUtils.class);
+public final static String RESOURCES_SUBDIR = "resources";
+public final static String NIMBUS_DO_NOT_REASSIGN = 
"NIMBUS-DO-NOT-REASSIGN";
+public static final String FILE_SEPARATOR = File.separator;
+
+public static String getLogDir() {
+String dir;
+Map conf;
+if (System.getProperty("storm.log.dir") != null) {
+dir = System.getProperty("storm.log.dir");
+} else if ((conf = readStormConfig()).get("storm.log.dir") != 
null) {
+dir = String.valueOf(conf.get("storm.log.dir"));
+} else {
+if (System.getProperty("storm.home") != null) {
+dir = System.getProperty("storm.home") + FILE_SEPARATOR + 
"logs";
+} else {
+dir = FILE_SEPARATOR + "logs";
+}
+}
+try {
+return new File(dir).getCanonicalPath();
+} catch (IOException ex) {
+throw new IllegalArgumentException("Illegal storm.log.dir in 
conf: " + dir);
+}
+}
+
+public static String clojureConfigName(String name) {
+return name.toUpperCase().replace("_", "-");
+}
+
+// ALL-CONFIGS is only used by executor.clj once, do we want to do it 
here? TODO
+public static List All_CONFIGS() {
+List ret = new ArrayList();
+Config config = new Config();
+Class ConfigClass = config.getClass();
+Field[] fields = ConfigClass.getFields();
+for (int i = 0; i < fields.length; i++) {
+try {
+Object obj = fields[i].get(null);
+ret.add(obj);
+} catch (IllegalArgumentException e) {
+LOG.error(e.getMessage(), e);
+} catch (IllegalAccessException e) {
+LOG.error(e.getMessage(), e);
+}
+}
+return ret;
+}
+
+public static String clusterMode(Map conf) {
+String mode = (String)conf.get(Config.STORM_CLUSTER_MODE);
+return mode;
+}
+
+public static boolean isLocalMode(Map conf) {
+String mode = (String)conf.get(Config.STORM_CLUSTER_MODE);
+if (mode != null) {
+if ("local".equals(mode)) {
+return true;
+}
+if ("distributed".equals(mode)) {
+return false;
+}
+}
+throw new IllegalArgumentException("Illegal cluster mode in conf: 
" + mode);
+}
+
+public static int samplingRate(Map conf) {
+double rate = 
Utils.getDouble(conf.get(Config.TOPOLOGY_STATS_SAMPLE_RATE));
+if (rate != 0) {
+return (int) (1 / rate);
+}
+  

[GitHub] storm pull request: [STORM-1227] port org.apache.storm.config to j...

2016-01-21 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/1030#discussion_r50462413
  
--- Diff: storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java ---
@@ -0,0 +1,678 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.utils;
+
+import org.apache.storm.Config;
+import org.apache.storm.validation.ConfigValidation;
+import org.apache.storm.generated.StormTopology;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Collections;
+import java.net.URLEncoder;
+
+public class ConfigUtils {
+private final static Logger LOG = 
LoggerFactory.getLogger(ConfigUtils.class);
+public final static String RESOURCES_SUBDIR = "resources";
+public final static String NIMBUS_DO_NOT_REASSIGN = 
"NIMBUS-DO-NOT-REASSIGN";
+public static final String FILE_SEPARATOR = File.separator;
+
+public static String getLogDir() {
+String dir;
+Map conf;
+if (System.getProperty("storm.log.dir") != null) {
+dir = System.getProperty("storm.log.dir");
+} else if ((conf = readStormConfig()).get("storm.log.dir") != 
null) {
+dir = String.valueOf(conf.get("storm.log.dir"));
+} else {
+if (System.getProperty("storm.home") != null) {
+dir = System.getProperty("storm.home") + FILE_SEPARATOR + 
"logs";
+} else {
+dir = FILE_SEPARATOR + "logs";
+}
+}
+try {
+return new File(dir).getCanonicalPath();
+} catch (IOException ex) {
+throw new IllegalArgumentException("Illegal storm.log.dir in 
conf: " + dir);
+}
+}
+
+public static String clojureConfigName(String name) {
+return name.toUpperCase().replace("_", "-");
+}
+
+// ALL-CONFIGS is only used by executor.clj once, do we want to do it 
here? TODO
+public static List All_CONFIGS() {
+List ret = new ArrayList();
+Config config = new Config();
+Class ConfigClass = config.getClass();
+Field[] fields = ConfigClass.getFields();
+for (int i = 0; i < fields.length; i++) {
+try {
+Object obj = fields[i].get(null);
+ret.add(obj);
+} catch (IllegalArgumentException e) {
+LOG.error(e.getMessage(), e);
+} catch (IllegalAccessException e) {
+LOG.error(e.getMessage(), e);
+}
+}
+return ret;
+}
+
+public static String clusterMode(Map conf) {
+String mode = (String)conf.get(Config.STORM_CLUSTER_MODE);
+return mode;
+}
+
+public static boolean isLocalMode(Map conf) {
+String mode = (String)conf.get(Config.STORM_CLUSTER_MODE);
+if (mode != null) {
+if ("local".equals(mode)) {
+return true;
+}
+if ("distributed".equals(mode)) {
+return false;
+}
+}
+throw new IllegalArgumentException("Illegal cluster mode in conf: 
" + mode);
+}
+
+public static int samplingRate(Map conf) {
+double rate = 
Utils.getDouble(conf.get(Config.TOPOLOGY_STATS_SAMPLE_RATE));
+if (rate != 0) {
+return (int) (1 / rate);
+}
+  

[GitHub] storm pull request: [STORM-1227] port org.apache.storm.config to j...

2016-01-21 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/1030#discussion_r50468248
  
--- Diff: storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java ---
@@ -0,0 +1,678 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.utils;
+
+import org.apache.storm.Config;
+import org.apache.storm.validation.ConfigValidation;
+import org.apache.storm.generated.StormTopology;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Collections;
+import java.net.URLEncoder;
+
+public class ConfigUtils {
+private final static Logger LOG = 
LoggerFactory.getLogger(ConfigUtils.class);
+public final static String RESOURCES_SUBDIR = "resources";
+public final static String NIMBUS_DO_NOT_REASSIGN = 
"NIMBUS-DO-NOT-REASSIGN";
+public static final String FILE_SEPARATOR = File.separator;
+
+public static String getLogDir() {
+String dir;
+Map conf;
+if (System.getProperty("storm.log.dir") != null) {
+dir = System.getProperty("storm.log.dir");
+} else if ((conf = readStormConfig()).get("storm.log.dir") != 
null) {
+dir = String.valueOf(conf.get("storm.log.dir"));
+} else {
+if (System.getProperty("storm.home") != null) {
+dir = System.getProperty("storm.home") + FILE_SEPARATOR + 
"logs";
+} else {
+dir = FILE_SEPARATOR + "logs";
+}
+}
+try {
+return new File(dir).getCanonicalPath();
+} catch (IOException ex) {
+throw new IllegalArgumentException("Illegal storm.log.dir in 
conf: " + dir);
+}
+}
+
+public static String clojureConfigName(String name) {
+return name.toUpperCase().replace("_", "-");
+}
+
+// ALL-CONFIGS is only used by executor.clj once, do we want to do it 
here? TODO
+public static List All_CONFIGS() {
+List ret = new ArrayList();
+Config config = new Config();
+Class ConfigClass = config.getClass();
+Field[] fields = ConfigClass.getFields();
+for (int i = 0; i < fields.length; i++) {
+try {
+Object obj = fields[i].get(null);
+ret.add(obj);
+} catch (IllegalArgumentException e) {
+LOG.error(e.getMessage(), e);
+} catch (IllegalAccessException e) {
+LOG.error(e.getMessage(), e);
+}
+}
+return ret;
+}
+
+public static String clusterMode(Map conf) {
+String mode = (String)conf.get(Config.STORM_CLUSTER_MODE);
+return mode;
+}
+
+public static boolean isLocalMode(Map conf) {
+String mode = (String)conf.get(Config.STORM_CLUSTER_MODE);
+if (mode != null) {
+if ("local".equals(mode)) {
+return true;
+}
+if ("distributed".equals(mode)) {
+return false;
+}
+}
+throw new IllegalArgumentException("Illegal cluster mode in conf: 
" + mode);
+}
+
+public static int samplingRate(Map conf) {
+double rate = 
Utils.getDouble(conf.get(Config.TOPOLOGY_STATS_SAMPLE_RATE));
+if (rate != 0) {
+return (int) (1 / rate);
+}
+  

[GitHub] storm pull request: Storm 1226

2016-01-25 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/1043#discussion_r50743788
  
--- Diff: storm-core/src/jvm/org/apache/storm/utils/Utils.java ---
@@ -1382,5 +1422,438 @@ public static TopologyInfo getTopologyInfo(String 
name, String asUser, Map storm
 public static int toPositive(int number) {
 return number & Integer.MAX_VALUE;
 }
-}
 
+
+
+//Everything from here on is translated from the old util.clj 
(storm-core/src/clj/backtype.storm/util.clj)
+
+//Wraps an exception in a RuntimeException if needed
+public static Exception wrapInRuntime (Exception e) {
+if (e instanceof RuntimeException) {
+return e;
+} else {
+return (new RuntimeException(e));
+}
+}
+
+public static final boolean isOnWindows = 
"Windows_NT".equals(System.getenv("OS"));
+
+public static final String filePathSeparator = 
System.getProperty("file.separator");
+
+public static final String classPathSeparator = 
System.getProperty("path.separator");
+
+
+/*
+Returns the first item of coll for which (pred item) returns 
logical true.
+Consumes sequences up to the first match, will consume the entire 
sequence
+and return nil if no match is found.
+ */
+public static Object findFirst (IPredicate pred, Collection coll) {
+if (coll == null || pred == null) {
+return null;
+} else {
+Iterator iter = coll.iterator();
+if (iter==null || !iter.hasNext()) {
+return null;
+} else {
+do {
--- End diff --

This if, else, and do-while can be replaced by a while:
```
while (iter != null && iter.hasNext()) {
...
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm 1226

2016-01-25 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/1043#discussion_r50745533
  
--- Diff: storm-core/src/jvm/org/apache/storm/utils/Utils.java ---
@@ -1382,5 +1422,438 @@ public static TopologyInfo getTopologyInfo(String 
name, String asUser, Map storm
 public static int toPositive(int number) {
 return number & Integer.MAX_VALUE;
 }
-}
 
+
+
+//Everything from here on is translated from the old util.clj 
(storm-core/src/clj/backtype.storm/util.clj)
+
+//Wraps an exception in a RuntimeException if needed
+public static Exception wrapInRuntime (Exception e) {
+if (e instanceof RuntimeException) {
+return e;
+} else {
+return (new RuntimeException(e));
+}
+}
+
+public static final boolean isOnWindows = 
"Windows_NT".equals(System.getenv("OS"));
+
+public static final String filePathSeparator = 
System.getProperty("file.separator");
+
+public static final String classPathSeparator = 
System.getProperty("path.separator");
+
+
+/*
+Returns the first item of coll for which (pred item) returns 
logical true.
+Consumes sequences up to the first match, will consume the entire 
sequence
+and return nil if no match is found.
+ */
+public static Object findFirst (IPredicate pred, Collection coll) {
+if (coll == null || pred == null) {
+return null;
+} else {
+Iterator iter = coll.iterator();
+if (iter==null || !iter.hasNext()) {
+return null;
+} else {
+do {
+Object obj = iter.next();
+if (pred.test(obj)) {
+return obj;
+}
+} while (iter.hasNext());
+return null;
+}
+}
+}
+
+public static Object findFirst (IPredicate pred, Map map) {
+if (map == null || pred == null) {
+return null;
+} else {
+Iterator iter = map.entrySet().iterator();
+if (iter==null || !iter.hasNext()) {
+return null;
+} else {
+do {
--- End diff --

Same here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm 1226

2016-01-25 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/1043#discussion_r50748875
  
--- Diff: storm-core/src/jvm/org/apache/storm/utils/Utils.java ---
@@ -1382,5 +1422,438 @@ public static TopologyInfo getTopologyInfo(String 
name, String asUser, Map storm
 public static int toPositive(int number) {
 return number & Integer.MAX_VALUE;
 }
-}
 
+
+
+//Everything from here on is translated from the old util.clj 
(storm-core/src/clj/backtype.storm/util.clj)
+
+//Wraps an exception in a RuntimeException if needed
+public static Exception wrapInRuntime (Exception e) {
+if (e instanceof RuntimeException) {
+return e;
+} else {
+return (new RuntimeException(e));
+}
+}
+
+public static final boolean isOnWindows = 
"Windows_NT".equals(System.getenv("OS"));
+
+public static final String filePathSeparator = 
System.getProperty("file.separator");
+
+public static final String classPathSeparator = 
System.getProperty("path.separator");
+
+
+/*
+Returns the first item of coll for which (pred item) returns 
logical true.
+Consumes sequences up to the first match, will consume the entire 
sequence
+and return nil if no match is found.
+ */
+public static Object findFirst (IPredicate pred, Collection coll) {
+if (coll == null || pred == null) {
+return null;
+} else {
+Iterator iter = coll.iterator();
+if (iter==null || !iter.hasNext()) {
+return null;
+} else {
+do {
+Object obj = iter.next();
+if (pred.test(obj)) {
+return obj;
+}
+} while (iter.hasNext());
+return null;
+}
+}
+}
+
+public static Object findFirst (IPredicate pred, Map map) {
+if (map == null || pred == null) {
+return null;
+} else {
+Iterator iter = map.entrySet().iterator();
+if (iter==null || !iter.hasNext()) {
+return null;
+} else {
+do {
+Object obj = iter.next();
+if (pred.test(obj)) {
+return obj;
+}
+} while (iter.hasNext());
+return null;
+}
+}
+}
+/*
+Note: since the following functions are nowhere used in Storm, 
they were not translated:
+dissoc-in
+indexed
+positions
+assoc-conj
+set-delta
+
+clojurify-structure  because it wouldn't make sense without clojure
+ */
+
+
+public static String localHostname () throws UnknownHostException {
+return _instance.localHostnameImpl();
+}
+
+protected String localHostnameImpl () throws UnknownHostException {
+return InetAddress.getLocalHost().getCanonicalHostName();
+}
+
+private static String memoizedLocalHostnameString = null;
+
+public static String memoizedLocalHostname () throws 
UnknownHostException {
+if (memoizedLocalHostnameString == null) {
+memoizedLocalHostnameString = localHostname();
+}
+return memoizedLocalHostnameString;
+}
+
+/*
+checks conf for STORM_LOCAL_HOSTNAME.
+when unconfigured, falls back to (memoized) guess by 
`local-hostname`.
+*/
+public static String hostname (Map conf) throws 
UnknownHostException  {
+if (conf == null) {
+return memoizedLocalHostname();
+}
+Object hostnameString = conf.get(Config.STORM_LOCAL_HOSTNAME);
+if (hostnameString == null ) {
+return memoizedLocalHostname();
+}
+if (hostnameString.equals("")) {
+return memoizedLocalHostname();
+}
+return hostnameString.toString();
+}
+
+public static String uuid() {
+return UUID.randomUUID().toString();
+}
+
+public static int currentTimeSecs() {
+return Time.currentTimeSecs();
+}
+
+public static long currentTimeMillis() {
+return Time.currentTimeMillis();
+}
+
+public static long secsToMillisLong(double secs) {
+return (long) (1000 * secs);
+}
  

[GitHub] storm pull request: Storm 1226

2016-01-25 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/1043#discussion_r50748974
  
--- Diff: storm-core/src/jvm/org/apache/storm/utils/Utils.java ---
@@ -1382,5 +1422,438 @@ public static TopologyInfo getTopologyInfo(String 
name, String asUser, Map storm
 public static int toPositive(int number) {
 return number & Integer.MAX_VALUE;
 }
-}
 
+
+
+//Everything from here on is translated from the old util.clj 
(storm-core/src/clj/backtype.storm/util.clj)
+
+//Wraps an exception in a RuntimeException if needed
+public static Exception wrapInRuntime (Exception e) {
+if (e instanceof RuntimeException) {
+return e;
+} else {
+return (new RuntimeException(e));
+}
+}
+
+public static final boolean isOnWindows = 
"Windows_NT".equals(System.getenv("OS"));
+
+public static final String filePathSeparator = 
System.getProperty("file.separator");
+
+public static final String classPathSeparator = 
System.getProperty("path.separator");
+
+
+/*
+Returns the first item of coll for which (pred item) returns 
logical true.
+Consumes sequences up to the first match, will consume the entire 
sequence
+and return nil if no match is found.
+ */
+public static Object findFirst (IPredicate pred, Collection coll) {
+if (coll == null || pred == null) {
+return null;
+} else {
+Iterator iter = coll.iterator();
+if (iter==null || !iter.hasNext()) {
+return null;
+} else {
+do {
+Object obj = iter.next();
+if (pred.test(obj)) {
+return obj;
+}
+} while (iter.hasNext());
+return null;
+}
+}
+}
+
+public static Object findFirst (IPredicate pred, Map map) {
+if (map == null || pred == null) {
+return null;
+} else {
+Iterator iter = map.entrySet().iterator();
+if (iter==null || !iter.hasNext()) {
+return null;
+} else {
+do {
+Object obj = iter.next();
+if (pred.test(obj)) {
+return obj;
+}
+} while (iter.hasNext());
+return null;
+}
+}
+}
+/*
+Note: since the following functions are nowhere used in Storm, 
they were not translated:
+dissoc-in
+indexed
+positions
+assoc-conj
+set-delta
+
+clojurify-structure  because it wouldn't make sense without clojure
+ */
+
+
+public static String localHostname () throws UnknownHostException {
+return _instance.localHostnameImpl();
+}
+
+protected String localHostnameImpl () throws UnknownHostException {
+return InetAddress.getLocalHost().getCanonicalHostName();
+}
+
+private static String memoizedLocalHostnameString = null;
+
+public static String memoizedLocalHostname () throws 
UnknownHostException {
+if (memoizedLocalHostnameString == null) {
+memoizedLocalHostnameString = localHostname();
+}
+return memoizedLocalHostnameString;
+}
+
+/*
+checks conf for STORM_LOCAL_HOSTNAME.
+when unconfigured, falls back to (memoized) guess by 
`local-hostname`.
+*/
+public static String hostname (Map conf) throws 
UnknownHostException  {
+if (conf == null) {
+return memoizedLocalHostname();
+}
+Object hostnameString = conf.get(Config.STORM_LOCAL_HOSTNAME);
+if (hostnameString == null ) {
+return memoizedLocalHostname();
+}
+if (hostnameString.equals("")) {
+return memoizedLocalHostname();
+}
+return hostnameString.toString();
+}
+
+public static String uuid() {
+return UUID.randomUUID().toString();
+}
+
+public static int currentTimeSecs() {
+return Time.currentTimeSecs();
+}
+
+public static long currentTimeMillis() {
+return Time.currentTimeMillis();
+}
+
+public static long secsToMillisLong(double secs) {
+return (long) (1000 * secs);
+}
  

[GitHub] storm pull request: Storm 1226

2016-01-25 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/1043#discussion_r50749149
  
--- Diff: storm-core/src/jvm/org/apache/storm/utils/Utils.java ---
@@ -1382,5 +1422,438 @@ public static TopologyInfo getTopologyInfo(String 
name, String asUser, Map storm
 public static int toPositive(int number) {
 return number & Integer.MAX_VALUE;
 }
-}
 
+
+
+//Everything from here on is translated from the old util.clj 
(storm-core/src/clj/backtype.storm/util.clj)
+
+//Wraps an exception in a RuntimeException if needed
+public static Exception wrapInRuntime (Exception e) {
+if (e instanceof RuntimeException) {
+return e;
+} else {
+return (new RuntimeException(e));
+}
+}
+
+public static final boolean isOnWindows = 
"Windows_NT".equals(System.getenv("OS"));
+
+public static final String filePathSeparator = 
System.getProperty("file.separator");
+
+public static final String classPathSeparator = 
System.getProperty("path.separator");
+
+
+/*
+Returns the first item of coll for which (pred item) returns 
logical true.
+Consumes sequences up to the first match, will consume the entire 
sequence
+and return nil if no match is found.
+ */
+public static Object findFirst (IPredicate pred, Collection coll) {
+if (coll == null || pred == null) {
+return null;
+} else {
+Iterator iter = coll.iterator();
+if (iter==null || !iter.hasNext()) {
+return null;
+} else {
+do {
+Object obj = iter.next();
+if (pred.test(obj)) {
+return obj;
+}
+} while (iter.hasNext());
+return null;
+}
+}
+}
+
+public static Object findFirst (IPredicate pred, Map map) {
+if (map == null || pred == null) {
+return null;
+} else {
+Iterator iter = map.entrySet().iterator();
+if (iter==null || !iter.hasNext()) {
+return null;
+} else {
+do {
+Object obj = iter.next();
+if (pred.test(obj)) {
+return obj;
+}
+} while (iter.hasNext());
+return null;
+}
+}
+}
+/*
+Note: since the following functions are nowhere used in Storm, 
they were not translated:
+dissoc-in
+indexed
+positions
+assoc-conj
+set-delta
+
+clojurify-structure  because it wouldn't make sense without clojure
+ */
+
+
+public static String localHostname () throws UnknownHostException {
+return _instance.localHostnameImpl();
+}
+
+protected String localHostnameImpl () throws UnknownHostException {
+return InetAddress.getLocalHost().getCanonicalHostName();
+}
+
+private static String memoizedLocalHostnameString = null;
+
+public static String memoizedLocalHostname () throws 
UnknownHostException {
+if (memoizedLocalHostnameString == null) {
+memoizedLocalHostnameString = localHostname();
+}
+return memoizedLocalHostnameString;
+}
+
+/*
+checks conf for STORM_LOCAL_HOSTNAME.
+when unconfigured, falls back to (memoized) guess by 
`local-hostname`.
+*/
+public static String hostname (Map conf) throws 
UnknownHostException  {
+if (conf == null) {
+return memoizedLocalHostname();
+}
+Object hostnameString = conf.get(Config.STORM_LOCAL_HOSTNAME);
+if (hostnameString == null ) {
+return memoizedLocalHostname();
+}
+if (hostnameString.equals("")) {
+return memoizedLocalHostname();
+}
+return hostnameString.toString();
+}
+
+public static String uuid() {
+return UUID.randomUUID().toString();
+}
+
+public static int currentTimeSecs() {
+return Time.currentTimeSecs();
+}
+
+public static long currentTimeMillis() {
+return Time.currentTimeMillis();
+}
+
+public static long secsToMillisLong(double secs) {
+return (long) (1000 * secs);
+}
  

  1   2   3   4   5   6   7   >