[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-14 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55963334
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---
@@ -0,0 +1,672 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. kill 
the ones that should be dead - read pids, kill -9 and individually remove file 
- rmr
+ * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. 
of the rest, figure out what assignments aren't yet satisfied 4. generate new 
worker
+ * ids, write new "approved workers" to LS 5. create local dir for worker 
id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for 
workers
+ * launch
+ */
+public class SyncProcessEvent implements Runnable {
+
+private static Logger LOG = 
LoggerFactory.getLogger(SyncProcessEvent.class);
+
+private  LocalState localState;
+private  SupervisorData supervisorData;
+public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new 
ExecutorInfo(-1, -1);
+
+private class ProcessExitCallback implements Utils.ExitCodeCallable {
+private final String logPrefix;
+private final String workerId;
+
+public ProcessExitCallback(String logPrefix, String workerId) {
+this.logPrefix = logPrefix;
+this.workerId = workerId;
+}
+
+@Override
+public Object call() throws Exception {
+return null;
+}
+
+@Override
+public Object call(int exitCode) {
+LOG.info("{} exited with code: {}", logPrefix, exitCode);
+supervisorData.getDeadWorkers().add(workerId);
+return null;
+}
+}
+
+public SyncProcessEvent(){
+
+}
+public SyncProcessEvent(SupervisorData supervisorData) {
+init(supervisorData);
+}
+
+//TODO: initData is intended to local supervisor, so we will remove 
them after porting worker.clj to java
+public void init(SupervisorData supervisorData){
+this.supervisorData = supervisorData;
+this.localState = supervisorData.getLocalState();
+}
+
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. 
kill the ones that should be dead - read pids, kill -9 and individually remove 
file -
+ * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and 
log) 3. of the rest, figure out what assignments aren't yet satisfied 4. 
generate new
+ * worker ids, write new "approved workers" to LS 5. create local dir 
for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 
6. wait
+ * for workers launch
+ */
+@Override
+public void run() {
+LOG.debug("Syncing processes");
+try {
+Map conf = supervisorData.getConf();
+Map assignedExecutors = 
localState.getLocalAssignmentsMap()

[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192858#comment-15192858
 ] 

ASF GitHub Bot commented on STORM-1279:
---

Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55963334
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---
@@ -0,0 +1,672 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. kill 
the ones that should be dead - read pids, kill -9 and individually remove file 
- rmr
+ * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. 
of the rest, figure out what assignments aren't yet satisfied 4. generate new 
worker
+ * ids, write new "approved workers" to LS 5. create local dir for worker 
id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for 
workers
+ * launch
+ */
+public class SyncProcessEvent implements Runnable {
+
+private static Logger LOG = 
LoggerFactory.getLogger(SyncProcessEvent.class);
+
+private  LocalState localState;
+private  SupervisorData supervisorData;
+public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new 
ExecutorInfo(-1, -1);
+
+private class ProcessExitCallback implements Utils.ExitCodeCallable {
+private final String logPrefix;
+private final String workerId;
+
+public ProcessExitCallback(String logPrefix, String workerId) {
+this.logPrefix = logPrefix;
+this.workerId = workerId;
+}
+
+@Override
+public Object call() throws Exception {
+return null;
+}
+
+@Override
+public Object call(int exitCode) {
+LOG.info("{} exited with code: {}", logPrefix, exitCode);
+supervisorData.getDeadWorkers().add(workerId);
+return null;
+}
+}
+
+public SyncProcessEvent(){
+
+}
+public SyncProcessEvent(SupervisorData supervisorData) {
+init(supervisorData);
+}
+
+//TODO: initData is intended to local supervisor, so we will remove 
them after porting worker.clj to java
+public void init(SupervisorData supervisorData){
+this.supervisorData = supervisorData;
+this.localState = supervisorData.getLocalState();
+}
+
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. 
kill the ones that should be dead - read pids, kill -9 and individually remove 
file -
+ * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and 
log) 3. of the rest, figure out what assignments aren't yet satisfied 4. 
generate new
+ * worker ids, write new "approved workers" to LS 5. create local dir 
for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 
6. wait
+ * for workers launch

[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-14 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55963987
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---
@@ -0,0 +1,672 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. kill 
the ones that should be dead - read pids, kill -9 and individually remove file 
- rmr
+ * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. 
of the rest, figure out what assignments aren't yet satisfied 4. generate new 
worker
+ * ids, write new "approved workers" to LS 5. create local dir for worker 
id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for 
workers
+ * launch
+ */
+public class SyncProcessEvent implements Runnable {
+
+private static Logger LOG = 
LoggerFactory.getLogger(SyncProcessEvent.class);
+
+private  LocalState localState;
+private  SupervisorData supervisorData;
+public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new 
ExecutorInfo(-1, -1);
+
+private class ProcessExitCallback implements Utils.ExitCodeCallable {
+private final String logPrefix;
+private final String workerId;
+
+public ProcessExitCallback(String logPrefix, String workerId) {
+this.logPrefix = logPrefix;
+this.workerId = workerId;
+}
+
+@Override
+public Object call() throws Exception {
+return null;
+}
+
+@Override
+public Object call(int exitCode) {
+LOG.info("{} exited with code: {}", logPrefix, exitCode);
+supervisorData.getDeadWorkers().add(workerId);
+return null;
+}
+}
+
+public SyncProcessEvent(){
+
+}
+public SyncProcessEvent(SupervisorData supervisorData) {
+init(supervisorData);
+}
+
+//TODO: initData is intended to local supervisor, so we will remove 
them after porting worker.clj to java
+public void init(SupervisorData supervisorData){
+this.supervisorData = supervisorData;
+this.localState = supervisorData.getLocalState();
+}
+
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. 
kill the ones that should be dead - read pids, kill -9 and individually remove 
file -
+ * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and 
log) 3. of the rest, figure out what assignments aren't yet satisfied 4. 
generate new
+ * worker ids, write new "approved workers" to LS 5. create local dir 
for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 
6. wait
+ * for workers launch
+ */
+@Override
+public void run() {
+LOG.debug("Syncing processes");
+try {
+Map conf = supervisorData.getConf();
+Map assignedExecutors = 
localState.getLocalAssignmentsMap()

[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192863#comment-15192863
 ] 

ASF GitHub Bot commented on STORM-1279:
---

Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55963987
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---
@@ -0,0 +1,672 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. kill 
the ones that should be dead - read pids, kill -9 and individually remove file 
- rmr
+ * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. 
of the rest, figure out what assignments aren't yet satisfied 4. generate new 
worker
+ * ids, write new "approved workers" to LS 5. create local dir for worker 
id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for 
workers
+ * launch
+ */
+public class SyncProcessEvent implements Runnable {
+
+private static Logger LOG = 
LoggerFactory.getLogger(SyncProcessEvent.class);
+
+private  LocalState localState;
+private  SupervisorData supervisorData;
+public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new 
ExecutorInfo(-1, -1);
+
+private class ProcessExitCallback implements Utils.ExitCodeCallable {
+private final String logPrefix;
+private final String workerId;
+
+public ProcessExitCallback(String logPrefix, String workerId) {
+this.logPrefix = logPrefix;
+this.workerId = workerId;
+}
+
+@Override
+public Object call() throws Exception {
+return null;
+}
+
+@Override
+public Object call(int exitCode) {
+LOG.info("{} exited with code: {}", logPrefix, exitCode);
+supervisorData.getDeadWorkers().add(workerId);
+return null;
+}
+}
+
+public SyncProcessEvent(){
+
+}
+public SyncProcessEvent(SupervisorData supervisorData) {
+init(supervisorData);
+}
+
+//TODO: initData is intended to local supervisor, so we will remove 
them after porting worker.clj to java
+public void init(SupervisorData supervisorData){
+this.supervisorData = supervisorData;
+this.localState = supervisorData.getLocalState();
+}
+
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. 
kill the ones that should be dead - read pids, kill -9 and individually remove 
file -
+ * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and 
log) 3. of the rest, figure out what assignments aren't yet satisfied 4. 
generate new
+ * worker ids, write new "approved workers" to LS 5. create local dir 
for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 
6. wait
+ * for workers launch

[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-14 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55964084
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---
@@ -0,0 +1,672 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. kill 
the ones that should be dead - read pids, kill -9 and individually remove file 
- rmr
+ * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. 
of the rest, figure out what assignments aren't yet satisfied 4. generate new 
worker
+ * ids, write new "approved workers" to LS 5. create local dir for worker 
id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for 
workers
+ * launch
+ */
+public class SyncProcessEvent implements Runnable {
+
+private static Logger LOG = 
LoggerFactory.getLogger(SyncProcessEvent.class);
+
+private  LocalState localState;
+private  SupervisorData supervisorData;
+public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new 
ExecutorInfo(-1, -1);
+
+private class ProcessExitCallback implements Utils.ExitCodeCallable {
+private final String logPrefix;
+private final String workerId;
+
+public ProcessExitCallback(String logPrefix, String workerId) {
+this.logPrefix = logPrefix;
+this.workerId = workerId;
+}
+
+@Override
+public Object call() throws Exception {
+return null;
+}
+
+@Override
+public Object call(int exitCode) {
+LOG.info("{} exited with code: {}", logPrefix, exitCode);
+supervisorData.getDeadWorkers().add(workerId);
+return null;
+}
+}
+
+public SyncProcessEvent(){
+
+}
+public SyncProcessEvent(SupervisorData supervisorData) {
+init(supervisorData);
+}
+
+//TODO: initData is intended to local supervisor, so we will remove 
them after porting worker.clj to java
+public void init(SupervisorData supervisorData){
+this.supervisorData = supervisorData;
+this.localState = supervisorData.getLocalState();
+}
+
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. 
kill the ones that should be dead - read pids, kill -9 and individually remove 
file -
+ * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and 
log) 3. of the rest, figure out what assignments aren't yet satisfied 4. 
generate new
+ * worker ids, write new "approved workers" to LS 5. create local dir 
for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 
6. wait
+ * for workers launch
+ */
+@Override
+public void run() {
+LOG.debug("Syncing processes");
+try {
+Map conf = supervisorData.getConf();
+Map assignedExecutors = 
localState.getLocalAssignmentsMap()

[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192864#comment-15192864
 ] 

ASF GitHub Bot commented on STORM-1279:
---

Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55964084
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---
@@ -0,0 +1,672 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. kill 
the ones that should be dead - read pids, kill -9 and individually remove file 
- rmr
+ * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. 
of the rest, figure out what assignments aren't yet satisfied 4. generate new 
worker
+ * ids, write new "approved workers" to LS 5. create local dir for worker 
id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for 
workers
+ * launch
+ */
+public class SyncProcessEvent implements Runnable {
+
+private static Logger LOG = 
LoggerFactory.getLogger(SyncProcessEvent.class);
+
+private  LocalState localState;
+private  SupervisorData supervisorData;
+public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new 
ExecutorInfo(-1, -1);
+
+private class ProcessExitCallback implements Utils.ExitCodeCallable {
+private final String logPrefix;
+private final String workerId;
+
+public ProcessExitCallback(String logPrefix, String workerId) {
+this.logPrefix = logPrefix;
+this.workerId = workerId;
+}
+
+@Override
+public Object call() throws Exception {
+return null;
+}
+
+@Override
+public Object call(int exitCode) {
+LOG.info("{} exited with code: {}", logPrefix, exitCode);
+supervisorData.getDeadWorkers().add(workerId);
+return null;
+}
+}
+
+public SyncProcessEvent(){
+
+}
+public SyncProcessEvent(SupervisorData supervisorData) {
+init(supervisorData);
+}
+
+//TODO: initData is intended to local supervisor, so we will remove 
them after porting worker.clj to java
+public void init(SupervisorData supervisorData){
+this.supervisorData = supervisorData;
+this.localState = supervisorData.getLocalState();
+}
+
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. 
kill the ones that should be dead - read pids, kill -9 and individually remove 
file -
+ * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and 
log) 3. of the rest, figure out what assignments aren't yet satisfied 4. 
generate new
+ * worker ids, write new "approved workers" to LS 5. create local dir 
for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 
6. wait
+ * for workers launch

[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-14 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55965454
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java 
---
@@ -0,0 +1,631 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.cluster.IStateStorage;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.event.EventManager;
+import org.apache.storm.generated.*;
+import org.apache.storm.localizer.LocalResource;
+import org.apache.storm.localizer.LocalizedResource;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.*;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.JarURLConnection;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class SyncSupervisorEvent implements Runnable {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SyncSupervisorEvent.class);
+
+private EventManager syncSupEventManager;
+private EventManager syncProcessManager;
+private IStormClusterState stormClusterState;
+private LocalState localState;
+private SyncProcessEvent syncProcesses;
+private SupervisorData supervisorData;
+
+public SyncSupervisorEvent(SupervisorData supervisorData, 
SyncProcessEvent syncProcesses, EventManager syncSupEventManager,
+EventManager syncProcessManager) {
+
+this.syncProcesses = syncProcesses;
+this.syncSupEventManager = syncSupEventManager;
+this.syncProcessManager = syncProcessManager;
+this.stormClusterState = supervisorData.getStormClusterState();
+this.localState = supervisorData.getLocalState();
+this.supervisorData = supervisorData;
+}
+
+@Override
+public void run() {
+try {
+Map conf = supervisorData.getConf();
+Runnable syncCallback = new EventManagerPushCallback(this, 
syncSupEventManager);
+List stormIds = 
stormClusterState.assignments(syncCallback);
+Map> assignmentsSnapshot =
+getAssignmentsSnapshot(stormClusterState, stormIds, 
supervisorData.getAssignmentVersions().get(), syncCallback);
+Map> stormIdToProfilerActions = 
getProfileActions(stormClusterState, stormIds);
+
+Set allDownloadedTopologyIds = 
SupervisorUtils.readDownLoadedStormIds(conf);
+Map stormcodeMap = 
readStormCodeLocations(assignmentsSnapshot);
+Map existingAssignment = 
localState.getLocalAssignmentsMap();
+if (existingAssignment == null) {
+existingAssignment = new HashMap<>();
+}
+
+Map allAssignment =
+readAssignments(assignmentsSnapshot, 
existingAssignment, supervisorData.getAssignmentId(), 
supervisorData.getSyncRetry());
+
+Map newAssignment = new HashMap<>();
+Set assignedStormIds = new HashSet<>();
+
+for (Map.Entry entry : 
allAssignment.entrySet()) {
+if 
(supervisorData.getiSupervisor().confirmAssigned(entry.getKey())) {
+newAssignment.put(entry.getKey(), entry.getValue());
+
assignedStormIds.add(entry.getValue().get_topology_id());
+}
+}
+
+Set srashStormIds = verifyDownl

[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192876#comment-15192876
 ] 

ASF GitHub Bot commented on STORM-1279:
---

Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55965454
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java 
---
@@ -0,0 +1,631 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.cluster.IStateStorage;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.event.EventManager;
+import org.apache.storm.generated.*;
+import org.apache.storm.localizer.LocalResource;
+import org.apache.storm.localizer.LocalizedResource;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.*;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.JarURLConnection;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class SyncSupervisorEvent implements Runnable {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SyncSupervisorEvent.class);
+
+private EventManager syncSupEventManager;
+private EventManager syncProcessManager;
+private IStormClusterState stormClusterState;
+private LocalState localState;
+private SyncProcessEvent syncProcesses;
+private SupervisorData supervisorData;
+
+public SyncSupervisorEvent(SupervisorData supervisorData, 
SyncProcessEvent syncProcesses, EventManager syncSupEventManager,
+EventManager syncProcessManager) {
+
+this.syncProcesses = syncProcesses;
+this.syncSupEventManager = syncSupEventManager;
+this.syncProcessManager = syncProcessManager;
+this.stormClusterState = supervisorData.getStormClusterState();
+this.localState = supervisorData.getLocalState();
+this.supervisorData = supervisorData;
+}
+
+@Override
+public void run() {
+try {
+Map conf = supervisorData.getConf();
+Runnable syncCallback = new EventManagerPushCallback(this, 
syncSupEventManager);
+List stormIds = 
stormClusterState.assignments(syncCallback);
+Map> assignmentsSnapshot =
+getAssignmentsSnapshot(stormClusterState, stormIds, 
supervisorData.getAssignmentVersions().get(), syncCallback);
+Map> stormIdToProfilerActions = 
getProfileActions(stormClusterState, stormIds);
+
+Set allDownloadedTopologyIds = 
SupervisorUtils.readDownLoadedStormIds(conf);
+Map stormcodeMap = 
readStormCodeLocations(assignmentsSnapshot);
+Map existingAssignment = 
localState.getLocalAssignmentsMap();
+if (existingAssignment == null) {
+existingAssignment = new HashMap<>();
+}
+
+Map allAssignment =
+readAssignments(assignmentsSnapshot, 
existingAssignment, supervisorData.getAssignmentId(), 
supervisorData.getSyncRetry());
+
+Map newAssignment = new HashMap<>();
+Set assignedStormIds = new HashSet<>();
+
+for (Map.Entry entry : 
allAssignment.entrySet()) {
+if 
(supervisorData.getiSupervisor().confirmAssigned(entry.getKey())) {
  

[GitHub] storm pull request: STORM-1464: Support multiple file outputs

2016-03-14 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1044#discussion_r55965801
  
--- Diff: external/storm-hdfs/README.md ---
@@ -240,6 +240,23 @@ If you are using Trident and sequence files you can do 
something like this:
 .addRotationAction(new 
MoveFileAction().withDestination("/dest2/"));
 ```
 
+### Data Partitioning
+Data can be partitioned to different HDFS directories based on 
characteristics of the tuple being processed or purely
+external factors, such as system time.  To partition your your data, write 
a class that implements the ```Partitioner```
+interface and pass it to the withPartitioner() method of your bolt. The 
getPartitionPath() method returns a partition 
+path for a given tuple.
+
+Here's an example of a Partitioner that operates on a specific field of 
data:
+
+```java
+
+Partitioner partitoner = new Partitioner() {
+@Override
+public String getPartitionPath(Tuple tuple) {
+return Path.SEPARATOR + "city=" + 
tuple.getStringByField("city");
--- End diff --

Why does the example path contain "city=" ? 
If we are expecting the Partitioner to return a path its better to return a 
Path (java.nio.file.Path) so that it  does not return any arbitrary string.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1464: Support multiple file outputs

2016-03-14 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1044#discussion_r55965826
  
--- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
 ---
@@ -52,9 +54,10 @@
  * Half of the default Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS
  */
 private static final int DEFAULT_TICK_TUPLE_INTERVAL_SECS = 15;
+private static final Integer DEFAULT_MAX_OPEN_FILES = 5;
--- End diff --

Isn't the default too low?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1464) storm-hdfs should support writing to multiple files

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192881#comment-15192881
 ] 

ASF GitHub Bot commented on STORM-1464:
---

Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1044#discussion_r55965801
  
--- Diff: external/storm-hdfs/README.md ---
@@ -240,6 +240,23 @@ If you are using Trident and sequence files you can do 
something like this:
 .addRotationAction(new 
MoveFileAction().withDestination("/dest2/"));
 ```
 
+### Data Partitioning
+Data can be partitioned to different HDFS directories based on 
characteristics of the tuple being processed or purely
+external factors, such as system time.  To partition your your data, write 
a class that implements the ```Partitioner```
+interface and pass it to the withPartitioner() method of your bolt. The 
getPartitionPath() method returns a partition 
+path for a given tuple.
+
+Here's an example of a Partitioner that operates on a specific field of 
data:
+
+```java
+
+Partitioner partitoner = new Partitioner() {
+@Override
+public String getPartitionPath(Tuple tuple) {
+return Path.SEPARATOR + "city=" + 
tuple.getStringByField("city");
--- End diff --

Why does the example path contain "city=" ? 
If we are expecting the Partitioner to return a path its better to return a 
Path (java.nio.file.Path) so that it  does not return any arbitrary string.


> storm-hdfs should support writing to multiple files
> ---
>
> Key: STORM-1464
> URL: https://issues.apache.org/jira/browse/STORM-1464
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-hdfs
>Reporter: Aaron Dossett
>Assignee: Aaron Dossett
>  Labels: avro
>
> Examples of when this is needed include:
> - One avro bolt writing multiple schemas, each of which require a different 
> file. Schema evolution is a common use of avro and the avro bolt should 
> support that seamlessly.
> - Partitioning output to different directories based on the tuple contents.  
> For example, if the tuple contains a "USER" field, it should be possible to 
> partition based on that value.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1464) storm-hdfs should support writing to multiple files

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192883#comment-15192883
 ] 

ASF GitHub Bot commented on STORM-1464:
---

Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1044#discussion_r55965843
  
--- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
 ---
@@ -145,13 +134,20 @@ public final void execute(Tuple tuple) {
 
 synchronized (this.writeLock) {
 boolean forceSync = false;
+AbstractHDFSWriter writer = null;
+String writerKey = null;
+
 if (TupleUtils.isTick(tuple)) {
 LOG.debug("TICK! forcing a file system flush");
 this.collector.ack(tuple);
 forceSync = true;
 } else {
+
+writerKey = getHashKeyForTuple(tuple);
--- End diff --

Can't the partition path be used as the key?


> storm-hdfs should support writing to multiple files
> ---
>
> Key: STORM-1464
> URL: https://issues.apache.org/jira/browse/STORM-1464
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-hdfs
>Reporter: Aaron Dossett
>Assignee: Aaron Dossett
>  Labels: avro
>
> Examples of when this is needed include:
> - One avro bolt writing multiple schemas, each of which require a different 
> file. Schema evolution is a common use of avro and the avro bolt should 
> support that seamlessly.
> - Partitioning output to different directories based on the tuple contents.  
> For example, if the tuple contains a "USER" field, it should be possible to 
> partition based on that value.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-1464: Support multiple file outputs

2016-03-14 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1044#discussion_r55965855
  
--- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
 ---
@@ -198,22 +194,62 @@ public final void execute(Tuple tuple) {
 }
 }
 
-if(this.rotationPolicy.mark(tuple, this.offset)) {
-try {
-rotateOutputFile();
-this.rotationPolicy.reset();
-this.offset = 0;
-} catch (IOException e) {
-this.collector.reportError(e);
-LOG.warn("File could not be rotated");
-//At this point there is nothing to do.  In all 
likelihood any filesystem operations will fail.
-//The next tuple will almost certainly fail to write 
and/or sync, which force a rotation.  That
-//will give rotateAndReset() a chance to work which 
includes creating a fresh file handle.
-}
+if (writer != null && writer.needsRotation()) {
+doRotationAndRemoveWriter(writerKey, writer);
 }
 }
 }
 
+private AbstractHDFSWriter getOrCreateWriter(String writerKey, Tuple 
tuple) throws IOException {
+AbstractHDFSWriter writer;
+
+writer = writers.get(writerKey);
+if (writer == null) {
+if (writers.size() >= maxOpenFiles)
+{
+String keyToOldest = getKeyToOldestWriter();
+AbstractHDFSWriter oldest = writers.get(keyToOldest);
--- End diff --

Why not have getKeyToOldestWriter return the oldest entry directly and 
remove the extra call to writers.get ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1464: Support multiple file outputs

2016-03-14 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1044#discussion_r55965843
  
--- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
 ---
@@ -145,13 +134,20 @@ public final void execute(Tuple tuple) {
 
 synchronized (this.writeLock) {
 boolean forceSync = false;
+AbstractHDFSWriter writer = null;
+String writerKey = null;
+
 if (TupleUtils.isTick(tuple)) {
 LOG.debug("TICK! forcing a file system flush");
 this.collector.ack(tuple);
 forceSync = true;
 } else {
+
+writerKey = getHashKeyForTuple(tuple);
--- End diff --

Can't the partition path be used as the key?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1464) storm-hdfs should support writing to multiple files

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192882#comment-15192882
 ] 

ASF GitHub Bot commented on STORM-1464:
---

Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1044#discussion_r55965826
  
--- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
 ---
@@ -52,9 +54,10 @@
  * Half of the default Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS
  */
 private static final int DEFAULT_TICK_TUPLE_INTERVAL_SECS = 15;
+private static final Integer DEFAULT_MAX_OPEN_FILES = 5;
--- End diff --

Isn't the default too low?


> storm-hdfs should support writing to multiple files
> ---
>
> Key: STORM-1464
> URL: https://issues.apache.org/jira/browse/STORM-1464
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-hdfs
>Reporter: Aaron Dossett
>Assignee: Aaron Dossett
>  Labels: avro
>
> Examples of when this is needed include:
> - One avro bolt writing multiple schemas, each of which require a different 
> file. Schema evolution is a common use of avro and the avro bolt should 
> support that seamlessly.
> - Partitioning output to different directories based on the tuple contents.  
> For example, if the tuple contains a "USER" field, it should be possible to 
> partition based on that value.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-1464: Support multiple file outputs

2016-03-14 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1044#discussion_r55965890
  
--- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
 ---
@@ -198,22 +194,62 @@ public final void execute(Tuple tuple) {
 }
 }
 
-if(this.rotationPolicy.mark(tuple, this.offset)) {
-try {
-rotateOutputFile();
-this.rotationPolicy.reset();
-this.offset = 0;
-} catch (IOException e) {
-this.collector.reportError(e);
-LOG.warn("File could not be rotated");
-//At this point there is nothing to do.  In all 
likelihood any filesystem operations will fail.
-//The next tuple will almost certainly fail to write 
and/or sync, which force a rotation.  That
-//will give rotateAndReset() a chance to work which 
includes creating a fresh file handle.
-}
+if (writer != null && writer.needsRotation()) {
+doRotationAndRemoveWriter(writerKey, writer);
 }
 }
 }
 
+private AbstractHDFSWriter getOrCreateWriter(String writerKey, Tuple 
tuple) throws IOException {
+AbstractHDFSWriter writer;
+
+writer = writers.get(writerKey);
+if (writer == null) {
+if (writers.size() >= maxOpenFiles)
+{
+String keyToOldest = getKeyToOldestWriter();
+AbstractHDFSWriter oldest = writers.get(keyToOldest);
+rotateOutputFile(oldest);
+writers.remove(keyToOldest);
--- End diff --

Is the oldest entry removed to maintain the max size of the writers 
hashtable (max open files limit) ? nit: most of the above can be taken care of 
if the writers map was a LinkedHashMap extension with a LRU logic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1464: Support multiple file outputs

2016-03-14 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1044#discussion_r55965901
  
--- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
 ---
@@ -198,22 +194,62 @@ public final void execute(Tuple tuple) {
 }
 }
 
-if(this.rotationPolicy.mark(tuple, this.offset)) {
-try {
-rotateOutputFile();
-this.rotationPolicy.reset();
-this.offset = 0;
-} catch (IOException e) {
-this.collector.reportError(e);
-LOG.warn("File could not be rotated");
-//At this point there is nothing to do.  In all 
likelihood any filesystem operations will fail.
-//The next tuple will almost certainly fail to write 
and/or sync, which force a rotation.  That
-//will give rotateAndReset() a chance to work which 
includes creating a fresh file handle.
-}
+if (writer != null && writer.needsRotation()) {
+doRotationAndRemoveWriter(writerKey, writer);
 }
 }
 }
 
+private AbstractHDFSWriter getOrCreateWriter(String writerKey, Tuple 
tuple) throws IOException {
+AbstractHDFSWriter writer;
+
+writer = writers.get(writerKey);
+if (writer == null) {
+if (writers.size() >= maxOpenFiles)
+{
+String keyToOldest = getKeyToOldestWriter();
+AbstractHDFSWriter oldest = writers.get(keyToOldest);
+rotateOutputFile(oldest);
+writers.remove(keyToOldest);
+}
+
+Path pathForNextFile = getBasePathForNextFile(tuple);
+writer = makeNewWriter(pathForNextFile, tuple);
+writers.put(writerKey, writer);
+this.rotation++;
+}
+return writer;
+}
+
+/**
+ * A tuple must be mapped to a writer based on two factors:
+ *  - bolt specific logic that must separate tuples into different 
files in the same directory (see the avro bolt
+ *for an example of this)
+ *  - the directory the tuple will be partioned into
+ *
+ * @param tuple
+ * @return
+ */
+private String getHashKeyForTuple(Tuple tuple) {
+final String boltKey = getWriterKey(tuple);
--- End diff --

Why a separate key instead of the partition path itself as the key ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1464: Support multiple file outputs

2016-03-14 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1044#discussion_r55965923
  
--- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
 ---
@@ -223,29 +259,57 @@ public final void execute(Tuple tuple) {
 public void declareOutputFields(OutputFieldsDeclarer 
outputFieldsDeclarer) {
 }
 
-/**
- * writes a tuple to the underlying filesystem but makes no guarantees 
about syncing data.
- *
- * this.offset is also updated to reflect additional data written
- *
- * @param tuple
- * @throws IOException
- */
-abstract void writeTuple(Tuple tuple) throws IOException;
+private void syncAllWriters() throws IOException {
+for (AbstractHDFSWriter writer : writers.values()) {
+writer.sync();
+}
+}
 
-/**
- * Make the best effort to sync written data to the underlying file 
system.  Concrete classes should very clearly
- * state the file state that sync guarantees.  For example, HdfsBolt 
can make a much stronger guarantee than
- * SequenceFileBolt.
- *
- * @throws IOException
- */
-abstract void syncTuples() throws IOException;
+private String getKeyToOldestWriter()
+{
+String oldestKey = null;
+long oldestTime = Long.MAX_VALUE;
+for (final Map.Entry entry : 
writers.entrySet()) {
+if (entry.getValue().getLastUsedTime() < oldestTime) {
+oldestKey = entry.getKey();
+oldestTime = entry.getValue().getLastUsedTime();
+}
+}
 
-abstract void closeOutputFile() throws IOException;
+return oldestKey;
+}
 
-abstract Path createOutputFile() throws IOException;
+private void startTimedRotationPolicy() {
+long interval = 
((TimedRotationPolicy)this.rotationPolicy).getInterval();
+this.rotationTimer = new Timer(true);
+TimerTask task = new TimerTask() {
+@Override
+public void run() {
+for (final AbstractHDFSWriter writer : writers.values()) {
+try {
+rotateOutputFile(writer);
+} catch (IOException e) {
+LOG.warn("IOException during scheduled file 
rotation.", e);
+}
+}
+writers.clear();
+}
+};
+this.rotationTimer.scheduleAtFixedRate(task, interval, interval);
+}
+
+protected Path getBasePathForNextFile(Tuple tuple) {
+
+Path fullPathToFile = new Path(this.fsUrl + 
this.fileNameFormat.getPath() + this.partitioner.getPartitionPath(tuple),
+this.fileNameFormat.getName(this.rotation, 
System.currentTimeMillis()));
--- End diff --

Since `this.rotation` is not per writer, we could end up having 
non-contiguous files names (like file-1, file-5, file-8 etc) inside a partition 
path, correct ? If so it should be fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1464: Support multiple file outputs

2016-03-14 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1044#discussion_r55965933
  
--- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
 ---
@@ -223,29 +259,57 @@ public final void execute(Tuple tuple) {
 public void declareOutputFields(OutputFieldsDeclarer 
outputFieldsDeclarer) {
 }
 
-/**
- * writes a tuple to the underlying filesystem but makes no guarantees 
about syncing data.
- *
- * this.offset is also updated to reflect additional data written
- *
- * @param tuple
- * @throws IOException
- */
-abstract void writeTuple(Tuple tuple) throws IOException;
+private void syncAllWriters() throws IOException {
+for (AbstractHDFSWriter writer : writers.values()) {
+writer.sync();
+}
+}
 
-/**
- * Make the best effort to sync written data to the underlying file 
system.  Concrete classes should very clearly
- * state the file state that sync guarantees.  For example, HdfsBolt 
can make a much stronger guarantee than
- * SequenceFileBolt.
- *
- * @throws IOException
- */
-abstract void syncTuples() throws IOException;
+private String getKeyToOldestWriter()
+{
+String oldestKey = null;
+long oldestTime = Long.MAX_VALUE;
+for (final Map.Entry entry : 
writers.entrySet()) {
+if (entry.getValue().getLastUsedTime() < oldestTime) {
+oldestKey = entry.getKey();
+oldestTime = entry.getValue().getLastUsedTime();
+}
+}
 
-abstract void closeOutputFile() throws IOException;
+return oldestKey;
+}
 
-abstract Path createOutputFile() throws IOException;
+private void startTimedRotationPolicy() {
+long interval = 
((TimedRotationPolicy)this.rotationPolicy).getInterval();
+this.rotationTimer = new Timer(true);
+TimerTask task = new TimerTask() {
+@Override
+public void run() {
+for (final AbstractHDFSWriter writer : writers.values()) {
+try {
+rotateOutputFile(writer);
+} catch (IOException e) {
+LOG.warn("IOException during scheduled file 
rotation.", e);
+}
+}
+writers.clear();
+}
+};
+this.rotationTimer.scheduleAtFixedRate(task, interval, interval);
+}
+
+protected Path getBasePathForNextFile(Tuple tuple) {
+
+Path fullPathToFile = new Path(this.fsUrl + 
this.fileNameFormat.getPath() + this.partitioner.getPartitionPath(tuple),
+this.fileNameFormat.getName(this.rotation, 
System.currentTimeMillis()));
+
+return fullPathToFile;
+}
 
 abstract void doPrepare(Map conf, TopologyContext topologyContext, 
OutputCollector collector) throws IOException;
 
+abstract String getWriterKey(Tuple tuple);
--- End diff --

this method is not required if partition path is used as the key.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1464) storm-hdfs should support writing to multiple files

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192884#comment-15192884
 ] 

ASF GitHub Bot commented on STORM-1464:
---

Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1044#discussion_r55965855
  
--- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
 ---
@@ -198,22 +194,62 @@ public final void execute(Tuple tuple) {
 }
 }
 
-if(this.rotationPolicy.mark(tuple, this.offset)) {
-try {
-rotateOutputFile();
-this.rotationPolicy.reset();
-this.offset = 0;
-} catch (IOException e) {
-this.collector.reportError(e);
-LOG.warn("File could not be rotated");
-//At this point there is nothing to do.  In all 
likelihood any filesystem operations will fail.
-//The next tuple will almost certainly fail to write 
and/or sync, which force a rotation.  That
-//will give rotateAndReset() a chance to work which 
includes creating a fresh file handle.
-}
+if (writer != null && writer.needsRotation()) {
+doRotationAndRemoveWriter(writerKey, writer);
 }
 }
 }
 
+private AbstractHDFSWriter getOrCreateWriter(String writerKey, Tuple 
tuple) throws IOException {
+AbstractHDFSWriter writer;
+
+writer = writers.get(writerKey);
+if (writer == null) {
+if (writers.size() >= maxOpenFiles)
+{
+String keyToOldest = getKeyToOldestWriter();
+AbstractHDFSWriter oldest = writers.get(keyToOldest);
--- End diff --

Why not have getKeyToOldestWriter return the oldest entry directly and 
remove the extra call to writers.get ?


> storm-hdfs should support writing to multiple files
> ---
>
> Key: STORM-1464
> URL: https://issues.apache.org/jira/browse/STORM-1464
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-hdfs
>Reporter: Aaron Dossett
>Assignee: Aaron Dossett
>  Labels: avro
>
> Examples of when this is needed include:
> - One avro bolt writing multiple schemas, each of which require a different 
> file. Schema evolution is a common use of avro and the avro bolt should 
> support that seamlessly.
> - Partitioning output to different directories based on the tuple contents.  
> For example, if the tuple contains a "USER" field, it should be possible to 
> partition based on that value.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1464) storm-hdfs should support writing to multiple files

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192886#comment-15192886
 ] 

ASF GitHub Bot commented on STORM-1464:
---

Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1044#discussion_r55965901
  
--- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
 ---
@@ -198,22 +194,62 @@ public final void execute(Tuple tuple) {
 }
 }
 
-if(this.rotationPolicy.mark(tuple, this.offset)) {
-try {
-rotateOutputFile();
-this.rotationPolicy.reset();
-this.offset = 0;
-} catch (IOException e) {
-this.collector.reportError(e);
-LOG.warn("File could not be rotated");
-//At this point there is nothing to do.  In all 
likelihood any filesystem operations will fail.
-//The next tuple will almost certainly fail to write 
and/or sync, which force a rotation.  That
-//will give rotateAndReset() a chance to work which 
includes creating a fresh file handle.
-}
+if (writer != null && writer.needsRotation()) {
+doRotationAndRemoveWriter(writerKey, writer);
 }
 }
 }
 
+private AbstractHDFSWriter getOrCreateWriter(String writerKey, Tuple 
tuple) throws IOException {
+AbstractHDFSWriter writer;
+
+writer = writers.get(writerKey);
+if (writer == null) {
+if (writers.size() >= maxOpenFiles)
+{
+String keyToOldest = getKeyToOldestWriter();
+AbstractHDFSWriter oldest = writers.get(keyToOldest);
+rotateOutputFile(oldest);
+writers.remove(keyToOldest);
+}
+
+Path pathForNextFile = getBasePathForNextFile(tuple);
+writer = makeNewWriter(pathForNextFile, tuple);
+writers.put(writerKey, writer);
+this.rotation++;
+}
+return writer;
+}
+
+/**
+ * A tuple must be mapped to a writer based on two factors:
+ *  - bolt specific logic that must separate tuples into different 
files in the same directory (see the avro bolt
+ *for an example of this)
+ *  - the directory the tuple will be partioned into
+ *
+ * @param tuple
+ * @return
+ */
+private String getHashKeyForTuple(Tuple tuple) {
+final String boltKey = getWriterKey(tuple);
--- End diff --

Why a separate key instead of the partition path itself as the key ?


> storm-hdfs should support writing to multiple files
> ---
>
> Key: STORM-1464
> URL: https://issues.apache.org/jira/browse/STORM-1464
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-hdfs
>Reporter: Aaron Dossett
>Assignee: Aaron Dossett
>  Labels: avro
>
> Examples of when this is needed include:
> - One avro bolt writing multiple schemas, each of which require a different 
> file. Schema evolution is a common use of avro and the avro bolt should 
> support that seamlessly.
> - Partitioning output to different directories based on the tuple contents.  
> For example, if the tuple contains a "USER" field, it should be possible to 
> partition based on that value.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1464) storm-hdfs should support writing to multiple files

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192888#comment-15192888
 ] 

ASF GitHub Bot commented on STORM-1464:
---

Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1044#discussion_r55965933
  
--- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
 ---
@@ -223,29 +259,57 @@ public final void execute(Tuple tuple) {
 public void declareOutputFields(OutputFieldsDeclarer 
outputFieldsDeclarer) {
 }
 
-/**
- * writes a tuple to the underlying filesystem but makes no guarantees 
about syncing data.
- *
- * this.offset is also updated to reflect additional data written
- *
- * @param tuple
- * @throws IOException
- */
-abstract void writeTuple(Tuple tuple) throws IOException;
+private void syncAllWriters() throws IOException {
+for (AbstractHDFSWriter writer : writers.values()) {
+writer.sync();
+}
+}
 
-/**
- * Make the best effort to sync written data to the underlying file 
system.  Concrete classes should very clearly
- * state the file state that sync guarantees.  For example, HdfsBolt 
can make a much stronger guarantee than
- * SequenceFileBolt.
- *
- * @throws IOException
- */
-abstract void syncTuples() throws IOException;
+private String getKeyToOldestWriter()
+{
+String oldestKey = null;
+long oldestTime = Long.MAX_VALUE;
+for (final Map.Entry entry : 
writers.entrySet()) {
+if (entry.getValue().getLastUsedTime() < oldestTime) {
+oldestKey = entry.getKey();
+oldestTime = entry.getValue().getLastUsedTime();
+}
+}
 
-abstract void closeOutputFile() throws IOException;
+return oldestKey;
+}
 
-abstract Path createOutputFile() throws IOException;
+private void startTimedRotationPolicy() {
+long interval = 
((TimedRotationPolicy)this.rotationPolicy).getInterval();
+this.rotationTimer = new Timer(true);
+TimerTask task = new TimerTask() {
+@Override
+public void run() {
+for (final AbstractHDFSWriter writer : writers.values()) {
+try {
+rotateOutputFile(writer);
+} catch (IOException e) {
+LOG.warn("IOException during scheduled file 
rotation.", e);
+}
+}
+writers.clear();
+}
+};
+this.rotationTimer.scheduleAtFixedRate(task, interval, interval);
+}
+
+protected Path getBasePathForNextFile(Tuple tuple) {
+
+Path fullPathToFile = new Path(this.fsUrl + 
this.fileNameFormat.getPath() + this.partitioner.getPartitionPath(tuple),
+this.fileNameFormat.getName(this.rotation, 
System.currentTimeMillis()));
+
+return fullPathToFile;
+}
 
 abstract void doPrepare(Map conf, TopologyContext topologyContext, 
OutputCollector collector) throws IOException;
 
+abstract String getWriterKey(Tuple tuple);
--- End diff --

this method is not required if partition path is used as the key.


> storm-hdfs should support writing to multiple files
> ---
>
> Key: STORM-1464
> URL: https://issues.apache.org/jira/browse/STORM-1464
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-hdfs
>Reporter: Aaron Dossett
>Assignee: Aaron Dossett
>  Labels: avro
>
> Examples of when this is needed include:
> - One avro bolt writing multiple schemas, each of which require a different 
> file. Schema evolution is a common use of avro and the avro bolt should 
> support that seamlessly.
> - Partitioning output to different directories based on the tuple contents.  
> For example, if the tuple contains a "USER" field, it should be possible to 
> partition based on that value.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1464) storm-hdfs should support writing to multiple files

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192887#comment-15192887
 ] 

ASF GitHub Bot commented on STORM-1464:
---

Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1044#discussion_r55965923
  
--- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
 ---
@@ -223,29 +259,57 @@ public final void execute(Tuple tuple) {
 public void declareOutputFields(OutputFieldsDeclarer 
outputFieldsDeclarer) {
 }
 
-/**
- * writes a tuple to the underlying filesystem but makes no guarantees 
about syncing data.
- *
- * this.offset is also updated to reflect additional data written
- *
- * @param tuple
- * @throws IOException
- */
-abstract void writeTuple(Tuple tuple) throws IOException;
+private void syncAllWriters() throws IOException {
+for (AbstractHDFSWriter writer : writers.values()) {
+writer.sync();
+}
+}
 
-/**
- * Make the best effort to sync written data to the underlying file 
system.  Concrete classes should very clearly
- * state the file state that sync guarantees.  For example, HdfsBolt 
can make a much stronger guarantee than
- * SequenceFileBolt.
- *
- * @throws IOException
- */
-abstract void syncTuples() throws IOException;
+private String getKeyToOldestWriter()
+{
+String oldestKey = null;
+long oldestTime = Long.MAX_VALUE;
+for (final Map.Entry entry : 
writers.entrySet()) {
+if (entry.getValue().getLastUsedTime() < oldestTime) {
+oldestKey = entry.getKey();
+oldestTime = entry.getValue().getLastUsedTime();
+}
+}
 
-abstract void closeOutputFile() throws IOException;
+return oldestKey;
+}
 
-abstract Path createOutputFile() throws IOException;
+private void startTimedRotationPolicy() {
+long interval = 
((TimedRotationPolicy)this.rotationPolicy).getInterval();
+this.rotationTimer = new Timer(true);
+TimerTask task = new TimerTask() {
+@Override
+public void run() {
+for (final AbstractHDFSWriter writer : writers.values()) {
+try {
+rotateOutputFile(writer);
+} catch (IOException e) {
+LOG.warn("IOException during scheduled file 
rotation.", e);
+}
+}
+writers.clear();
+}
+};
+this.rotationTimer.scheduleAtFixedRate(task, interval, interval);
+}
+
+protected Path getBasePathForNextFile(Tuple tuple) {
+
+Path fullPathToFile = new Path(this.fsUrl + 
this.fileNameFormat.getPath() + this.partitioner.getPartitionPath(tuple),
+this.fileNameFormat.getName(this.rotation, 
System.currentTimeMillis()));
--- End diff --

Since `this.rotation` is not per writer, we could end up having 
non-contiguous files names (like file-1, file-5, file-8 etc) inside a partition 
path, correct ? If so it should be fixed.


> storm-hdfs should support writing to multiple files
> ---
>
> Key: STORM-1464
> URL: https://issues.apache.org/jira/browse/STORM-1464
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-hdfs
>Reporter: Aaron Dossett
>Assignee: Aaron Dossett
>  Labels: avro
>
> Examples of when this is needed include:
> - One avro bolt writing multiple schemas, each of which require a different 
> file. Schema evolution is a common use of avro and the avro bolt should 
> support that seamlessly.
> - Partitioning output to different directories based on the tuple contents.  
> For example, if the tuple contains a "USER" field, it should be possible to 
> partition based on that value.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1464) storm-hdfs should support writing to multiple files

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192885#comment-15192885
 ] 

ASF GitHub Bot commented on STORM-1464:
---

Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1044#discussion_r55965890
  
--- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
 ---
@@ -198,22 +194,62 @@ public final void execute(Tuple tuple) {
 }
 }
 
-if(this.rotationPolicy.mark(tuple, this.offset)) {
-try {
-rotateOutputFile();
-this.rotationPolicy.reset();
-this.offset = 0;
-} catch (IOException e) {
-this.collector.reportError(e);
-LOG.warn("File could not be rotated");
-//At this point there is nothing to do.  In all 
likelihood any filesystem operations will fail.
-//The next tuple will almost certainly fail to write 
and/or sync, which force a rotation.  That
-//will give rotateAndReset() a chance to work which 
includes creating a fresh file handle.
-}
+if (writer != null && writer.needsRotation()) {
+doRotationAndRemoveWriter(writerKey, writer);
 }
 }
 }
 
+private AbstractHDFSWriter getOrCreateWriter(String writerKey, Tuple 
tuple) throws IOException {
+AbstractHDFSWriter writer;
+
+writer = writers.get(writerKey);
+if (writer == null) {
+if (writers.size() >= maxOpenFiles)
+{
+String keyToOldest = getKeyToOldestWriter();
+AbstractHDFSWriter oldest = writers.get(keyToOldest);
+rotateOutputFile(oldest);
+writers.remove(keyToOldest);
--- End diff --

Is the oldest entry removed to maintain the max size of the writers 
hashtable (max open files limit) ? nit: most of the above can be taken care of 
if the writers map was a LinkedHashMap extension with a LRU logic.


> storm-hdfs should support writing to multiple files
> ---
>
> Key: STORM-1464
> URL: https://issues.apache.org/jira/browse/STORM-1464
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-hdfs
>Reporter: Aaron Dossett
>Assignee: Aaron Dossett
>  Labels: avro
>
> Examples of when this is needed include:
> - One avro bolt writing multiple schemas, each of which require a different 
> file. Schema evolution is a common use of avro and the avro bolt should 
> support that seamlessly.
> - Partitioning output to different directories based on the tuple contents.  
> For example, if the tuple contains a "USER" field, it should be possible to 
> partition based on that value.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-1464: Support multiple file outputs

2016-03-14 Thread arunmahadevan
Github user arunmahadevan commented on the pull request:

https://github.com/apache/storm/pull/1044#issuecomment-196194151
  
@dossett @harshach the changes looks good overall, but I have a few 
questions/comments and once those are addressed I will review it again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1464) storm-hdfs should support writing to multiple files

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192889#comment-15192889
 ] 

ASF GitHub Bot commented on STORM-1464:
---

Github user arunmahadevan commented on the pull request:

https://github.com/apache/storm/pull/1044#issuecomment-196194151
  
@dossett @harshach the changes looks good overall, but I have a few 
questions/comments and once those are addressed I will review it again.


> storm-hdfs should support writing to multiple files
> ---
>
> Key: STORM-1464
> URL: https://issues.apache.org/jira/browse/STORM-1464
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-hdfs
>Reporter: Aaron Dossett
>Assignee: Aaron Dossett
>  Labels: avro
>
> Examples of when this is needed include:
> - One avro bolt writing multiple schemas, each of which require a different 
> file. Schema evolution is a common use of avro and the avro bolt should 
> support that seamlessly.
> - Partitioning output to different directories based on the tuple contents.  
> For example, if the tuple contains a "USER" field, it should be possible to 
> partition based on that value.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-956: When the execute() or nextTuple() h...

2016-03-14 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1209#issuecomment-196213692
  
It looks to good.  I also hope we should see this done through both the 
metrics system and through writing an error into zookeeper that would show up 
on the UI for the component that is stuck as @revans2  @bastiliu  said. Then 
let users manually see what is happening.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-956) When the execute() or nextTuple() hang on external resources, stop the Worker's heartbeat

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192943#comment-15192943
 ] 

ASF GitHub Bot commented on STORM-956:
--

Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1209#issuecomment-196213692
  
It looks to good.  I also hope we should see this done through both the 
metrics system and through writing an error into zookeeper that would show up 
on the UI for the component that is stuck as @revans2  @bastiliu  said. Then 
let users manually see what is happening.


> When the execute() or nextTuple() hang on external resources, stop the 
> Worker's heartbeat
> -
>
> Key: STORM-956
> URL: https://issues.apache.org/jira/browse/STORM-956
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Chuanlei Ni
>Assignee: Chuanlei Ni
>Priority: Minor
>   Original Estimate: 6h
>  Remaining Estimate: 6h
>
> Sometimes the work threads produced by mk-threads in executor.clj hang on 
> external resources or other unknown reasons. This makes the workers stop 
> processing the tuples.  I think it is better to kill this worker to resolve 
> the "hang". I plan to :
> 1. like `setup-ticks`, send a system-tick to receive-queue
> 2. the tuple-action-fn deal with this system-tick and remember the time that 
> processes this tuple in the executor-data
> 3. when worker do local heartbeat, check the time the executor writes to 
> executor-data. If the time is long from current (for example, 3 minutes), the 
> worker does not do the heartbeat.  So the supervisor could deal with this 
> problem.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-956: When the execute() or nextTuple() h...

2016-03-14 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1209#issuecomment-196220384
  
Spout itself emits messages by SpoutOutputCollector 's emit().  If lots of 
messages failed, then acker will trigger SpoutOutputCollector emits those 
failed messages. It may happen dead lock. Because down bolts may slow to handle 
messsages and it will block emit(),  then spout/acker thread will block.  Thus 
others messages which is send by those can't be handled by acker. So the bolts 
will block. The scene may be called "loop dead lock".  I want say that this PR 
is sound to this scene. Because It can make us find the dead lock in time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-956) When the execute() or nextTuple() hang on external resources, stop the Worker's heartbeat

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192961#comment-15192961
 ] 

ASF GitHub Bot commented on STORM-956:
--

Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1209#issuecomment-196220384
  
Spout itself emits messages by SpoutOutputCollector 's emit().  If lots of 
messages failed, then acker will trigger SpoutOutputCollector emits those 
failed messages. It may happen dead lock. Because down bolts may slow to handle 
messsages and it will block emit(),  then spout/acker thread will block.  Thus 
others messages which is send by those can't be handled by acker. So the bolts 
will block. The scene may be called "loop dead lock".  I want say that this PR 
is sound to this scene. Because It can make us find the dead lock in time.


> When the execute() or nextTuple() hang on external resources, stop the 
> Worker's heartbeat
> -
>
> Key: STORM-956
> URL: https://issues.apache.org/jira/browse/STORM-956
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Chuanlei Ni
>Assignee: Chuanlei Ni
>Priority: Minor
>   Original Estimate: 6h
>  Remaining Estimate: 6h
>
> Sometimes the work threads produced by mk-threads in executor.clj hang on 
> external resources or other unknown reasons. This makes the workers stop 
> processing the tuples.  I think it is better to kill this worker to resolve 
> the "hang". I plan to :
> 1. like `setup-ticks`, send a system-tick to receive-queue
> 2. the tuple-action-fn deal with this system-tick and remember the time that 
> processes this tuple in the executor-data
> 3. when worker do local heartbeat, check the time the executor writes to 
> executor-data. If the time is long from current (for example, 3 minutes), the 
> worker does not do the heartbeat.  So the supervisor could deal with this 
> problem.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (STORM-1618) Add the option of passing config directory apart from the file

2016-03-14 Thread Abhishek Agarwal (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-1618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhishek Agarwal resolved STORM-1618.
-
   Resolution: Fixed
Fix Version/s: 2.0.0

> Add the option of passing config directory apart from the file
> --
>
> Key: STORM-1618
> URL: https://issues.apache.org/jira/browse/STORM-1618
> Project: Apache Storm
>  Issue Type: Improvement
>Reporter: Abhishek Agarwal
>Assignee: Abhishek Agarwal
>Priority: Trivial
> Fix For: 2.0.0
>
>
> The error message, if configuration file doesn't exist, is also confusing. 
> `Error: Cannot find configuration directory: /etc/storm/conf`
> the binary is actually looking for a file. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-1252] port backtype.storm.stats to java

2016-03-14 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1147#issuecomment-196277268
  
@revans2 any update on this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1252) port backtype.storm.stats to java

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193149#comment-15193149
 ] 

ASF GitHub Bot commented on STORM-1252:
---

Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1147#issuecomment-196277268
  
@revans2 any update on this?


> port backtype.storm.stats to java
> -
>
> Key: STORM-1252
> URL: https://issues.apache.org/jira/browse/STORM-1252
> Project: Apache Storm
>  Issue Type: New Feature
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Cody
>  Labels: java-migration, jstorm-merger
>
> clojure methods for getting/setting built-in statistics.  Mostly wrappers 
> around some java classes.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-14 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r55989920
  
--- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java ---
@@ -565,19 +578,169 @@ public Stream aggregate(Fields inputFields, 
ReducerAggregator agg, Fields functi
 .aggregate(inputFields, agg, functionFields)
 .chainEnd();
 }
-
+
+/**
+ * Returns a stream of tuples which are aggregated results of a 
tumbling window with every {@code windowCount} of tuples.
+ *
+ * @param windowCount represents window tuples count
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+public Stream tumblingCountWindow(int windowCount, Fields inputFields, 
Aggregator aggregator, Fields functionFields) {
--- End diff --

The various windowing configurations can be expressed with the 
`window(WindowConfig windowConfig,...)`. If we are adding wrappers over it, can 
you please maintain compatibility with the core apis ?

i.e,
```java
tumblingWindow(Count count)
tumblingWindow(Duration duration)
window(Count windowLength, Count slidingInterval)
window(Duration windowLength, Duration slidingInterval)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193154#comment-15193154
 ] 

ASF GitHub Bot commented on STORM-676:
--

Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r55989920
  
--- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java ---
@@ -565,19 +578,169 @@ public Stream aggregate(Fields inputFields, 
ReducerAggregator agg, Fields functi
 .aggregate(inputFields, agg, functionFields)
 .chainEnd();
 }
-
+
+/**
+ * Returns a stream of tuples which are aggregated results of a 
tumbling window with every {@code windowCount} of tuples.
+ *
+ * @param windowCount represents window tuples count
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+public Stream tumblingCountWindow(int windowCount, Fields inputFields, 
Aggregator aggregator, Fields functionFields) {
--- End diff --

The various windowing configurations can be expressed with the 
`window(WindowConfig windowConfig,...)`. If we are adding wrappers over it, can 
you please maintain compatibility with the core apis ?

i.e,
```java
tumblingWindow(Count count)
tumblingWindow(Duration duration)
window(Count windowLength, Count slidingInterval)
window(Duration windowLength, Duration slidingInterval)
```


> Storm Trident support for sliding/tumbling windows
> --
>
> Key: STORM-676
> URL: https://issues.apache.org/jira/browse/STORM-676
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Sriharsha Chintalapani
>Assignee: Satish Duggana
> Fix For: 1.0.0, 2.0.0
>
> Attachments: StormTrident_windowing_support-676.pdf
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-14 Thread arunmahadevan
Github user arunmahadevan commented on the pull request:

https://github.com/apache/storm/pull/1072#issuecomment-196279990
  
Haven't gone through the code in great detail, but when I ran a simple 
topology with a tumbling time window of 3 secs,

- If the spout emits a batch and sleeps, the result of the window 
agregation is never emitted (ideally it should have emitted at 3 secs). 

- If the spout periodically emits batches (one batch every 5 sec), the time 
when the window output is generated always conincides with the time the spout 
emits the batches (i.e at 10, 15, 20 secs) whereas one would expect the result 
to be emitted at 6, 12, 15, 18 etc. Also only one result is emitted at time t = 
10 secs, whereas two batches are complete at this time.
 

Can you verify if this is the current behavior and if so fix it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193155#comment-15193155
 ] 

ASF GitHub Bot commented on STORM-676:
--

Github user arunmahadevan commented on the pull request:

https://github.com/apache/storm/pull/1072#issuecomment-196279990
  
Haven't gone through the code in great detail, but when I ran a simple 
topology with a tumbling time window of 3 secs,

- If the spout emits a batch and sleeps, the result of the window 
agregation is never emitted (ideally it should have emitted at 3 secs). 

- If the spout periodically emits batches (one batch every 5 sec), the time 
when the window output is generated always conincides with the time the spout 
emits the batches (i.e at 10, 15, 20 secs) whereas one would expect the result 
to be emitted at 6, 12, 15, 18 etc. Also only one result is emitted at time t = 
10 secs, whereas two batches are complete at this time.
 

Can you verify if this is the current behavior and if so fix it?


> Storm Trident support for sliding/tumbling windows
> --
>
> Key: STORM-676
> URL: https://issues.apache.org/jira/browse/STORM-676
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Sriharsha Chintalapani
>Assignee: Satish Duggana
> Fix For: 1.0.0, 2.0.0
>
> Attachments: StormTrident_windowing_support-676.pdf
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-1615] Update state checkpointing doc wi...

2016-03-14 Thread arunmahadevan
GitHub user arunmahadevan opened a pull request:

https://github.com/apache/storm/pull/1210

[STORM-1615] Update state checkpointing doc with the acking behavior



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/arunmahadevan/storm STORM-1615

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1210.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1210


commit 9fee0172f90a4f5e18369107e3cf3c71eb9d2f79
Author: Arun Mahadevan 
Date:   2016-03-14T13:02:42Z

[STORM-1615] Update state checkpointing doc with the acking behavior




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1615) Update state checkpointing doc with bolt's acking contract

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193246#comment-15193246
 ] 

ASF GitHub Bot commented on STORM-1615:
---

GitHub user arunmahadevan opened a pull request:

https://github.com/apache/storm/pull/1210

[STORM-1615] Update state checkpointing doc with the acking behavior



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/arunmahadevan/storm STORM-1615

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1210.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1210


commit 9fee0172f90a4f5e18369107e3cf3c71eb9d2f79
Author: Arun Mahadevan 
Date:   2016-03-14T13:02:42Z

[STORM-1615] Update state checkpointing doc with the acking behavior




> Update state checkpointing doc with bolt's acking contract
> --
>
> Key: STORM-1615
> URL: https://issues.apache.org/jira/browse/STORM-1615
> Project: Apache Storm
>  Issue Type: Bug
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
> Fix For: 1.0.0, 2.0.0
>
>
> Update 
> https://github.com/apache/storm/blob/asf-site/documentation/State-checkpointing.md



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (STORM-1622) Topology jars referring to shaded classes of older version of storm jar cannot be run in Storm 1.0.0

2016-03-14 Thread Abhishek Agarwal (JIRA)
Abhishek Agarwal created STORM-1622:
---

 Summary: Topology jars referring to shaded classes of older 
version of storm jar cannot be run in Storm 1.0.0
 Key: STORM-1622
 URL: https://issues.apache.org/jira/browse/STORM-1622
 Project: Apache Storm
  Issue Type: Bug
  Components: storm-core
Reporter: Abhishek Agarwal
Assignee: Abhishek Agarwal


This commit 
https://github.com/apache/storm/commit/12fdaa01b20eb144bf18a231a05c92873c90aa09

changes the package names of shaded classes inside the storm.  These classes 
are shipped inside the maven release of storm-core jar and can depended upon 
the topology jar. Jar built with older version of storm-core and using this 
dependency, wouldn't run on newer version of storm cluster. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1622) Topology jars referring to shaded classes of older version of storm jar cannot be run in Storm 1.0.0

2016-03-14 Thread Abhishek Agarwal (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193267#comment-15193267
 ] 

Abhishek Agarwal commented on STORM-1622:
-

Got this exception while submitting the topology built with 0.9.6 on Storm 
1.0.0 cluster
java.lang.NoClassDefFoundError: org/apache/storm/guava/base/Preconditions

[~revans2] what do you think of this? It can be fixed in 1.0.0 using the 
ClientJarTransformer with trivial changes. I am not sure how it will work for 
0.10.0 though. 

> Topology jars referring to shaded classes of older version of storm jar 
> cannot be run in Storm 1.0.0
> 
>
> Key: STORM-1622
> URL: https://issues.apache.org/jira/browse/STORM-1622
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-core
>Reporter: Abhishek Agarwal
>Assignee: Abhishek Agarwal
>
> This commit 
> https://github.com/apache/storm/commit/12fdaa01b20eb144bf18a231a05c92873c90aa09
> changes the package names of shaded classes inside the storm.  These classes 
> are shipped inside the maven release of storm-core jar and can depended upon 
> the topology jar. Jar built with older version of storm-core and using this 
> dependency, wouldn't run on newer version of storm cluster. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1622) Topology jars referring to shaded classes of older version of storm jar cannot be run in Storm 1.0.0

2016-03-14 Thread Robert Joseph Evans (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193270#comment-15193270
 ] 

Robert Joseph Evans commented on STORM-1622:


Topology jars are not supposed to depend on thing that storm has shaded.  
Shading the jars is an attempt to hide it from end users and turn it into an 
implementation detail.  This is why we ship a dependency reduced pom which does 
not list the dependencies that we shaded, so that they can pull in the 
dependency themselves and package it with their topology jar.  We did this so 
that a topology jar is not tied to a specific storm release as much as before.  
The shading went in as part of 0.10, I am -1 on undoing 
https://github.com/apache/storm/commit/12fdaa01b20eb144bf18a231a05c92873c90aa09.
  If you want to add in some new mappings to the hack remapping jar I am OK 
with that, but I would like to see the code updated so a different warning 
about not depending on shaded storm dependencies is output. 

> Topology jars referring to shaded classes of older version of storm jar 
> cannot be run in Storm 1.0.0
> 
>
> Key: STORM-1622
> URL: https://issues.apache.org/jira/browse/STORM-1622
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-core
>Reporter: Abhishek Agarwal
>Assignee: Abhishek Agarwal
>
> This commit 
> https://github.com/apache/storm/commit/12fdaa01b20eb144bf18a231a05c92873c90aa09
> changes the package names of shaded classes inside the storm.  These classes 
> are shipped inside the maven release of storm-core jar and can depended upon 
> the topology jar. Jar built with older version of storm-core and using this 
> dependency, wouldn't run on newer version of storm cluster. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (STORM-1623) nimbus.clj's minor bug

2016-03-14 Thread John Fang (JIRA)
John Fang created STORM-1623:


 Summary: nimbus.clj's minor bug
 Key: STORM-1623
 URL: https://issues.apache.org/jira/browse/STORM-1623
 Project: Apache Storm
  Issue Type: Bug
Reporter: John Fang
Assignee: John Fang






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: fix bug about nimbus.clj

2016-03-14 Thread hustfxj
GitHub user hustfxj opened a pull request:

https://github.com/apache/storm/pull/1211

fix bug about nimbus.clj



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hustfxj/storm nimbus-bug

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1211.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1211


commit 6d59676acb2789238fddbe8d84830abda8a6038b
Author: xiaojian.fxj 
Date:   2016-03-14T13:23:06Z

fix bug about nimbus.clj




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1623) nimbus.clj's minor bug

2016-03-14 Thread John Fang (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193278#comment-15193278
 ] 

John Fang commented on STORM-1623:
--

https://github.com/apache/storm/pull/1211

> nimbus.clj's minor bug
> --
>
> Key: STORM-1623
> URL: https://issues.apache.org/jira/browse/STORM-1623
> Project: Apache Storm
>  Issue Type: Bug
>Reporter: John Fang
>Assignee: John Fang
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1622) Topology jars referring to shaded classes of older version of storm jar cannot be run in Storm 1.0.0

2016-03-14 Thread Robert Joseph Evans (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193289#comment-15193289
 ] 

Robert Joseph Evans commented on STORM-1622:


[~abhishek.agarwal],

Sorry I guess I was typing while you were typing.  As part of the 0.10 
transition for us, we told everyone that they had to recompile against the 
newer 0.10 version of storm, and gave out explicit instructions including the 
exact version of the storm dependencies that were in 0.9.x.  That way they 
could have a jar that would work on both 0.9.x and on 0.10.  Typically it 
involved asking them to run {{mvn dependency:tree}} and save it off to a file, 
then update the version of storm they were compiling against to be 0.10.x and 
rerun {{mvn dependency:tree}}.  We then had them compare the version of their 
dependencies and make them match the original one.

In this case it is different because they are trying to use a class that is 
intended to be private to storm.  I would ask them to first pick a version of 
guava that they want to depend on, because it is shaded in 0.9.6 they can pick 
just about any one that they want, add it to their pom.xml as a dependency 
(that is not provided), change their imports of the guava code to point to the 
proper location, and then recompile/package.  The new jar should work on 0.9.6 
and 0.10, and if you use the hack remapping it will work on 1.0 too.  If you 
want to try and port the hack back to 0.10 you can, but I really want it 
yelling at them if they use one of our shaded dependencies.

> Topology jars referring to shaded classes of older version of storm jar 
> cannot be run in Storm 1.0.0
> 
>
> Key: STORM-1622
> URL: https://issues.apache.org/jira/browse/STORM-1622
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-core
>Reporter: Abhishek Agarwal
>Assignee: Abhishek Agarwal
>
> This commit 
> https://github.com/apache/storm/commit/12fdaa01b20eb144bf18a231a05c92873c90aa09
> changes the package names of shaded classes inside the storm.  These classes 
> are shipped inside the maven release of storm-core jar and can depended upon 
> the topology jar. Jar built with older version of storm-core and using this 
> dependency, wouldn't run on newer version of storm cluster. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (STORM-1624) add maven central status in README

2016-03-14 Thread Xin Wang (JIRA)
Xin Wang created STORM-1624:
---

 Summary: add maven central status in README
 Key: STORM-1624
 URL: https://issues.apache.org/jira/browse/STORM-1624
 Project: Apache Storm
  Issue Type: Improvement
Reporter: Xin Wang
Assignee: Xin Wang






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1464) storm-hdfs should support writing to multiple files

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193307#comment-15193307
 ] 

ASF GitHub Bot commented on STORM-1464:
---

Github user dossett commented on the pull request:

https://github.com/apache/storm/pull/1044#issuecomment-196316194
  
Thanks @arunmahadevan and @harshach.  I'll rebase and review feedback this 
week.


> storm-hdfs should support writing to multiple files
> ---
>
> Key: STORM-1464
> URL: https://issues.apache.org/jira/browse/STORM-1464
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-hdfs
>Reporter: Aaron Dossett
>Assignee: Aaron Dossett
>  Labels: avro
>
> Examples of when this is needed include:
> - One avro bolt writing multiple schemas, each of which require a different 
> file. Schema evolution is a common use of avro and the avro bolt should 
> support that seamlessly.
> - Partitioning output to different directories based on the tuple contents.  
> For example, if the tuple contains a "USER" field, it should be possible to 
> partition based on that value.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-14 Thread satishd
Github user satishd commented on the pull request:

https://github.com/apache/storm/pull/1072#issuecomment-196317380
  
@arunmahadevan What is the batch emit interval in your topology 
configuration? Each triggered result is emitted as part of the nearest batch 
completed. Trident runs in batches and the emitted trigger would be part of the 
latest completed batch. These details are mentioned in the design doc attached 
in STORM-676. That is the reason it may have emitted results at the nearest 
completed batches. 
Can you please share your configuration so that I can confirm the behavior?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1624] add maven central status in READM...

2016-03-14 Thread vesense
GitHub user vesense opened a pull request:

https://github.com/apache/storm/pull/1212

[STORM-1624] add maven central status in README



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vesense/storm maven-status

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1212.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1212


commit d80e7ab7a3a6f8427fd46ead30ab4e1dd71077aa
Author: Xin Wang 
Date:   2016-03-14T13:40:38Z

add maven status

commit e87ab8e267a21c97f7b41a7fec8a3dc5631e1e4a
Author: Xin Wang 
Date:   2016-03-14T13:45:33Z

minor fix




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1624) add maven central status in README

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193315#comment-15193315
 ] 

ASF GitHub Bot commented on STORM-1624:
---

GitHub user vesense opened a pull request:

https://github.com/apache/storm/pull/1212

[STORM-1624] add maven central status in README



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vesense/storm maven-status

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1212.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1212


commit d80e7ab7a3a6f8427fd46ead30ab4e1dd71077aa
Author: Xin Wang 
Date:   2016-03-14T13:40:38Z

add maven status

commit e87ab8e267a21c97f7b41a7fec8a3dc5631e1e4a
Author: Xin Wang 
Date:   2016-03-14T13:45:33Z

minor fix




> add maven central status in README
> --
>
> Key: STORM-1624
> URL: https://issues.apache.org/jira/browse/STORM-1624
> Project: Apache Storm
>  Issue Type: Improvement
>Reporter: Xin Wang
>Assignee: Xin Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193313#comment-15193313
 ] 

ASF GitHub Bot commented on STORM-676:
--

Github user satishd commented on the pull request:

https://github.com/apache/storm/pull/1072#issuecomment-196317380
  
@arunmahadevan What is the batch emit interval in your topology 
configuration? Each triggered result is emitted as part of the nearest batch 
completed. Trident runs in batches and the emitted trigger would be part of the 
latest completed batch. These details are mentioned in the design doc attached 
in STORM-676. That is the reason it may have emitted results at the nearest 
completed batches. 
Can you please share your configuration so that I can confirm the behavior?


> Storm Trident support for sliding/tumbling windows
> --
>
> Key: STORM-676
> URL: https://issues.apache.org/jira/browse/STORM-676
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Sriharsha Chintalapani
>Assignee: Satish Duggana
> Fix For: 1.0.0, 2.0.0
>
> Attachments: StormTrident_windowing_support-676.pdf
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-1464: Support multiple file outputs

2016-03-14 Thread dossett
Github user dossett commented on the pull request:

https://github.com/apache/storm/pull/1044#issuecomment-196316194
  
Thanks @arunmahadevan and @harshach.  I'll rebase and review feedback this 
week.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1483) add storm-mongodb connector

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193319#comment-15193319
 ] 

ASF GitHub Bot commented on STORM-1483:
---

Github user vesense commented on the pull request:

https://github.com/apache/storm/pull/1038#issuecomment-196319894
  
ping @harshach 


> add storm-mongodb connector
> ---
>
> Key: STORM-1483
> URL: https://issues.apache.org/jira/browse/STORM-1483
> Project: Apache Storm
>  Issue Type: Improvement
>Reporter: Xin Wang
>Assignee: Xin Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-1483] add storm-mongodb connector

2016-03-14 Thread vesense
Github user vesense commented on the pull request:

https://github.com/apache/storm/pull/1038#issuecomment-196319894
  
ping @harshach 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1608] Fix stateful topology acking beha...

2016-03-14 Thread arunmahadevan
Github user arunmahadevan commented on the pull request:

https://github.com/apache/storm/pull/1190#issuecomment-19633
  
@revans2 @ptgoetz I think the comments in this PR are addressed, can you 
take a look and merge ? Also raised https://github.com/apache/storm/pull/1210 
for updating the doc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1608) Fix stateful topology acking behavior

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193329#comment-15193329
 ] 

ASF GitHub Bot commented on STORM-1608:
---

Github user arunmahadevan commented on the pull request:

https://github.com/apache/storm/pull/1190#issuecomment-19633
  
@revans2 @ptgoetz I think the comments in this PR are addressed, can you 
take a look and merge ? Also raised https://github.com/apache/storm/pull/1210 
for updating the doc.


> Fix stateful topology acking behavior
> -
>
> Key: STORM-1608
> URL: https://issues.apache.org/jira/browse/STORM-1608
> Project: Apache Storm
>  Issue Type: Bug
>Affects Versions: 1.0.0, 2.0.0
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>
> Right now the acking is automatically taken care of for the non-stateful 
> bolts in a stateful topology. This leads to double acking if BaseRichBolts 
> are part of the topology. For the non-stateful bolts, its better to let the 
> bolt do the acking rather than automatically acking.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (STORM-1625) guava dependency is being shipped inside storm distribution

2016-03-14 Thread Abhishek Agarwal (JIRA)
Abhishek Agarwal created STORM-1625:
---

 Summary: guava dependency is being shipped inside storm 
distribution
 Key: STORM-1625
 URL: https://issues.apache.org/jira/browse/STORM-1625
 Project: Apache Storm
  Issue Type: Bug
  Components: storm-core, storm-sql
Reporter: Abhishek Agarwal
Assignee: Abhishek Agarwal


We shade guava classes inside storm-core so that client can work with any 
version of guava without any clashes. However, storm-sql-core has a transitive 
dependency on guava and thus the guava jar is still getting shipped in lib 
folder. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1622) Topology jars referring to shaded classes of older version of storm jar cannot be run in Storm 1.0.0

2016-03-14 Thread Abhishek Agarwal (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193355#comment-15193355
 ] 

Abhishek Agarwal commented on STORM-1622:
-

I think it should be ok to just add new mappings in the StormShadeRequest. 
Depending on the 0.10 doesn't need much changes in topology since the 
store-core packages are still the same. I will add a method for printing 
warning about not using private classes of storm. 

There is another problem related to jar dependency - STORM-1625


> Topology jars referring to shaded classes of older version of storm jar 
> cannot be run in Storm 1.0.0
> 
>
> Key: STORM-1622
> URL: https://issues.apache.org/jira/browse/STORM-1622
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-core
>Reporter: Abhishek Agarwal
>Assignee: Abhishek Agarwal
>
> This commit 
> https://github.com/apache/storm/commit/12fdaa01b20eb144bf18a231a05c92873c90aa09
> changes the package names of shaded classes inside the storm.  These classes 
> are shipped inside the maven release of storm-core jar and can depended upon 
> the topology jar. Jar built with older version of storm-core and using this 
> dependency, wouldn't run on newer version of storm cluster. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1625) guava dependency is being shipped inside storm distribution

2016-03-14 Thread Abhishek Agarwal (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193363#comment-15193363
 ] 

Abhishek Agarwal commented on STORM-1625:
-

{noformat}
[INFO] +- org.apache.storm:storm-sql-core:jar:1.0.0-SNAPSHOT:compile
[INFO] |  +- org.apache.calcite:calcite-core:jar:1.4.0-incubating:compile
[INFO] |  |  +- org.apache.calcite:calcite-avatica:jar:1.4.0-incubating:compile
[INFO] |  |  +- org.apache.calcite:calcite-linq4j:jar:1.4.0-incubating:compile
[INFO] |  |  +- com.fasterxml.jackson.core:jackson-core:jar:2.6.3:compile
[INFO] |  |  +- com.fasterxml.jackson.core:jackson-databind:jar:2.6.3:compile
[INFO] |  |  +- com.google.guava:guava:jar:16.0.1:compile
[INFO] |  |  \- net.hydromatic:eigenbase-properties:jar:1.1.5:compile
{noformat}

> guava dependency is being shipped inside storm distribution
> ---
>
> Key: STORM-1625
> URL: https://issues.apache.org/jira/browse/STORM-1625
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-core, storm-sql
>Reporter: Abhishek Agarwal
>Assignee: Abhishek Agarwal
>
> We shade guava classes inside storm-core so that client can work with any 
> version of guava without any clashes. However, storm-sql-core has a 
> transitive dependency on guava and thus the guava jar is still getting 
> shipped in lib folder. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Different topologies need different schedulers

2016-03-14 Thread Boyang(Jerry) Peng
This can be solved using the Resource Aware Scheduler.  As the resource aware 
scheduler allows users to specify on a per topology basis what scheduling 
strategy to use.

On Sunday, March 13, 2016 9:52 PM, devopts  wrote:
 

 Hi all,
    I think it is a issue that all topologies only is scheduled by the same 
scheduler.
And it doesn't work when i set the "storm.scheduler" value in my topology,such 
as 
config.put(Config.STORM_SCHEDULE,xxx.xxx).
Now ,different topologies needs to be scheduled by different schedulers.
And how to solve the problem?

  

[jira] [Assigned] (STORM-1300) port backtype.storm.scheduler.resource-aware-scheduler-test to java

2016-03-14 Thread Zhuo Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-1300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhuo Liu reassigned STORM-1300:
---

Assignee: Zhuo Liu

> port  backtype.storm.scheduler.resource-aware-scheduler-test to java
> 
>
> Key: STORM-1300
> URL: https://issues.apache.org/jira/browse/STORM-1300
> Project: Apache Storm
>  Issue Type: New Feature
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Zhuo Liu
>  Labels: java-migration, jstorm-merger
>
> Test RAS



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: Rename README.markdown to README.md.

2016-03-14 Thread jumarko
GitHub user jumarko opened a pull request:

https://github.com/apache/storm/pull/1213

Rename README.markdown to README.md.

.md extension is more standard one and commonly used.
There are already CHANGELOG.md, DEVELOPER.md, and SECURITY.md in this repo 
so it's also better to be consistent.
Moreover some tools don't recognize .markdown extension.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jumarko/storm readme

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1213.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1213


commit 10c48489c5648e36b00b8b92667af1bbc039dfa7
Author: Juraj Martinka 
Date:   2016-03-14T15:44:24Z

Rename README.markdown to README.md.

.md extension is more standard one and commonly used.
There are already CHANGELOG.md, DEVELOPER.md, and SECURITY.md in this repo 
so it's also better to be consistent.
Moreover some tools don't recognize .markdown extension.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1611] port org.apache.storm.pacemaker.p...

2016-03-14 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1195#issuecomment-196393789
  
Every can help me review this PR. In my perspective , I hope we can 
accelerate the first phrase job.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1611) port org.apache.storm.pacemaker.pacemaker to java

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193556#comment-15193556
 ] 

ASF GitHub Bot commented on STORM-1611:
---

Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1195#issuecomment-196393789
  
Every can help me review this PR. In my perspective , I hope we can 
accelerate the first phrase job.


> port org.apache.storm.pacemaker.pacemaker to java
> -
>
> Key: STORM-1611
> URL: https://issues.apache.org/jira/browse/STORM-1611
> Project: Apache Storm
>  Issue Type: New Feature
>Reporter: John Fang
>Assignee: John Fang
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Different topologies need different schedulers

2016-03-14 Thread Li Wang
Hi,
Under current scheduler implementation, I am afraid this can not be achieved. 
Would you please specify the reason why you want different topologies to be 
scheduled by different schedulers. 

Thanks.
Li


> On 14 Mar 2016, at 10:51 AM, devopts  wrote:
> 
> Hi all,
>I think it is a issue that all topologies only is scheduled by the same 
> scheduler.
> And it doesn't work when i set the "storm.scheduler" value in my 
> topology,such as 
> config.put(Config.STORM_SCHEDULE,xxx.xxx).
> Now ,different topologies needs to be scheduled by different schedulers.
> And how to solve the problem?



[GitHub] storm pull request: [STORM-1523] util.clj available-port conversio...

2016-03-14 Thread redsanket
Github user redsanket commented on the pull request:

https://github.com/apache/storm/pull/1073#issuecomment-196423667
  
@harshach upmerged and squashed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1523) Translate Available Ports task in util.java

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193685#comment-15193685
 ] 

ASF GitHub Bot commented on STORM-1523:
---

Github user redsanket commented on the pull request:

https://github.com/apache/storm/pull/1073#issuecomment-196423667
  
@harshach upmerged and squashed


> Translate Available Ports task in util.java
> ---
>
> Key: STORM-1523
> URL: https://issues.apache.org/jira/browse/STORM-1523
> Project: Apache Storm
>  Issue Type: Sub-task
>  Components: storm-core
>Reporter: Sanket Reddy
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1623) nimbus.clj's minor bug

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193698#comment-15193698
 ] 

ASF GitHub Bot commented on STORM-1623:
---

Github user redsanket commented on the pull request:

https://github.com/apache/storm/pull/1211#issuecomment-196424927
  
+1


> nimbus.clj's minor bug
> --
>
> Key: STORM-1623
> URL: https://issues.apache.org/jira/browse/STORM-1623
> Project: Apache Storm
>  Issue Type: Bug
>Reporter: John Fang
>Assignee: John Fang
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-1623] fix bug about nimbus.clj

2016-03-14 Thread redsanket
Github user redsanket commented on the pull request:

https://github.com/apache/storm/pull/1211#issuecomment-196424927
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-822: Kafka Spout New Consumer API

2016-03-14 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1131#discussion_r56041205
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
 ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents the output streams associated with each topic, and provides 
a public API to
+ * declare output streams and emmit tuples, on the appropriate stream, for 
all the topics specified.
+ */
+public class KafkaSpoutStreams implements Serializable {
+private final Map topicToStream;
+
+private KafkaSpoutStreams(Builder builder) {
+this.topicToStream = builder.topicToStream;
+}
+
+/**
+ * @param topic the topic for which to get output fields
+ * @return the output fields declared
+ */
+public Fields getOutputFields(String topic) {
+if (topicToStream.containsKey(topic)) {
+return topicToStream.get(topic).getOutputFields();
+}
+throw new IllegalStateException(this.getClass().getName() + " not 
configured for topic: " + topic);
+}
+
+/**
+ * @param topic the topic to for which to get the stream id
+ * @return the id of the stream to where the tuples are emitted
+ */
+public String getStreamId(String topic) {
+if (topicToStream.containsKey(topic)) {
+return topicToStream.get(topic).getStreamId();
+}
+throw new IllegalStateException(this.getClass().getName() + " not 
configured for topic: " + topic);
+}
+
+/**
+ * @return list of topics subscribed and emitting tuples to a stream 
as configured by {@link KafkaSpoutStream}
+ */
+public List getTopics() {
+return new ArrayList<>(topicToStream.keySet());
+}
+
+void declareOutputFields(OutputFieldsDeclarer declarer) {
+for (KafkaSpoutStream stream : topicToStream.values()) {
+declarer.declareStream(stream.getStreamId(), 
stream.getOutputFields());
+}
+}
+
+void emit(SpoutOutputCollector collector, MessageId messageId) {
+collector.emit(getStreamId(messageId.topic()), 
messageId.getTuple(), messageId);
--- End diff --

@abhishekagarwal87 @harshach @revans2 I have a patch I am about to upload 
that I believe addresses most or all of these points. It will give an option to 
have a behavior identical to the previous Kafka Spout, and an option to keep an 
upper limit on the amount of memory (i.e. pending records in memory) used by 
each spout.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-822) As a storm developer I’d like to use the new kafka consumer API (0.8.3) to reduce dependencies and use long term supported kafka apis

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193702#comment-15193702
 ] 

ASF GitHub Bot commented on STORM-822:
--

Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1131#discussion_r56041205
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
 ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents the output streams associated with each topic, and provides 
a public API to
+ * declare output streams and emmit tuples, on the appropriate stream, for 
all the topics specified.
+ */
+public class KafkaSpoutStreams implements Serializable {
+private final Map topicToStream;
+
+private KafkaSpoutStreams(Builder builder) {
+this.topicToStream = builder.topicToStream;
+}
+
+/**
+ * @param topic the topic for which to get output fields
+ * @return the output fields declared
+ */
+public Fields getOutputFields(String topic) {
+if (topicToStream.containsKey(topic)) {
+return topicToStream.get(topic).getOutputFields();
+}
+throw new IllegalStateException(this.getClass().getName() + " not 
configured for topic: " + topic);
+}
+
+/**
+ * @param topic the topic to for which to get the stream id
+ * @return the id of the stream to where the tuples are emitted
+ */
+public String getStreamId(String topic) {
+if (topicToStream.containsKey(topic)) {
+return topicToStream.get(topic).getStreamId();
+}
+throw new IllegalStateException(this.getClass().getName() + " not 
configured for topic: " + topic);
+}
+
+/**
+ * @return list of topics subscribed and emitting tuples to a stream 
as configured by {@link KafkaSpoutStream}
+ */
+public List getTopics() {
+return new ArrayList<>(topicToStream.keySet());
+}
+
+void declareOutputFields(OutputFieldsDeclarer declarer) {
+for (KafkaSpoutStream stream : topicToStream.values()) {
+declarer.declareStream(stream.getStreamId(), 
stream.getOutputFields());
+}
+}
+
+void emit(SpoutOutputCollector collector, MessageId messageId) {
+collector.emit(getStreamId(messageId.topic()), 
messageId.getTuple(), messageId);
--- End diff --

@abhishekagarwal87 @harshach @revans2 I have a patch I am about to upload 
that I believe addresses most or all of these points. It will give an option to 
have a behavior identical to the previous Kafka Spout, and an option to keep an 
upper limit on the amount of memory (i.e. pending records in memory) used by 
each spout.


> As a storm developer I’d like to use the new kafka consumer API (0.8.3) to 
> reduce dependencies and use long term supported kafka apis 
> --
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
>  Issue Type: Story
>  Components: storm-kafka
>Reporter: Thomas Becker
>Assignee: Hugo Louro
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1608) Fix stateful topology acking behavior

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193788#comment-15193788
 ] 

ASF GitHub Bot commented on STORM-1608:
---

Github user ptgoetz commented on the pull request:

https://github.com/apache/storm/pull/1190#issuecomment-196447926
  
+1


> Fix stateful topology acking behavior
> -
>
> Key: STORM-1608
> URL: https://issues.apache.org/jira/browse/STORM-1608
> Project: Apache Storm
>  Issue Type: Bug
>Affects Versions: 1.0.0, 2.0.0
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>
> Right now the acking is automatically taken care of for the non-stateful 
> bolts in a stateful topology. This leads to double acking if BaseRichBolts 
> are part of the topology. For the non-stateful bolts, its better to let the 
> bolt do the acking rather than automatically acking.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-1608] Fix stateful topology acking beha...

2016-03-14 Thread ptgoetz
Github user ptgoetz commented on the pull request:

https://github.com/apache/storm/pull/1190#issuecomment-196447926
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1608) Fix stateful topology acking behavior

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193807#comment-15193807
 ] 

ASF GitHub Bot commented on STORM-1608:
---

Github user revans2 commented on the pull request:

https://github.com/apache/storm/pull/1190#issuecomment-196450856
  
+1


> Fix stateful topology acking behavior
> -
>
> Key: STORM-1608
> URL: https://issues.apache.org/jira/browse/STORM-1608
> Project: Apache Storm
>  Issue Type: Bug
>Affects Versions: 1.0.0, 2.0.0
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>
> Right now the acking is automatically taken care of for the non-stateful 
> bolts in a stateful topology. This leads to double acking if BaseRichBolts 
> are part of the topology. For the non-stateful bolts, its better to let the 
> bolt do the acking rather than automatically acking.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-1608] Fix stateful topology acking beha...

2016-03-14 Thread revans2
Github user revans2 commented on the pull request:

https://github.com/apache/storm/pull/1190#issuecomment-196450856
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1625) guava dependency is being shipped inside storm distribution

2016-03-14 Thread Robert Joseph Evans (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193849#comment-15193849
 ] 

Robert Joseph Evans commented on STORM-1625:


I wasn't paying attention to the sql work but we never should have included the 
sql jars in lib. It is an external library that is a layer on top of storm and 
should be treated as such.  I really would prefer to see the sql code compiled 
down into a topology jar that we can put in the distribution in a standard 
location so {{storm sql}} can find it, but it should not be on the classpath 
for everything.

> guava dependency is being shipped inside storm distribution
> ---
>
> Key: STORM-1625
> URL: https://issues.apache.org/jira/browse/STORM-1625
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-core, storm-sql
>Reporter: Abhishek Agarwal
>Assignee: Abhishek Agarwal
>
> We shade guava classes inside storm-core so that client can work with any 
> version of guava without any clashes. However, storm-sql-core has a 
> transitive dependency on guava and thus the guava jar is still getting 
> shipped in lib folder. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1602) Blobstore UTs are failed on Windows

2016-03-14 Thread P. Taylor Goetz (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193871#comment-15193871
 ] 

P. Taylor Goetz commented on STORM-1602:


+1 for fixing this for 1.0. I don't think we should drop support for Windows.

> Blobstore UTs are failed on Windows
> ---
>
> Key: STORM-1602
> URL: https://issues.apache.org/jira/browse/STORM-1602
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-core
>Affects Versions: 1.0.0
> Environment: Windows
>Reporter: Jungtaek Lim
>Priority: Critical
>
> Blobstore related UTs are failed on Windows.
> {code}
> ---
> Test set: org.apache.storm.blobstore.BlobStoreTest
> ---
> Tests run: 7, Failures: 0, Errors: 7, Skipped: 0, Time elapsed: 2.306 sec <<< 
> FAILURE! - in org.apache.storm.blobstore.BlobStoreTest
> testMultipleLocalFs(org.apache.storm.blobstore.BlobStoreTest)  Time elapsed: 
> 1.798 sec  <<< ERROR!
> java.nio.file.AccessDeniedException: 
> D:\storm\storm-core\target\blob-store-test-19f8e973-7c1b-4638-8679-2eb1adcac396\blobs\571\data_other\1457050287771.tmp
>  -> 
> D:\storm\storm-core\target\blob-store-test-19f8e973-7c1b-4638-8679-2eb1adcac396\blobs\571\data_other\data
>   at 
> sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:83)
>   at 
> sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
>   at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:301)
>   at 
> sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
>   at java.nio.file.Files.move(Files.java:1347)
>   at 
> org.apache.storm.blobstore.LocalFsBlobStoreFile.commit(LocalFsBlobStoreFile.java:127)
>   at 
> org.apache.storm.blobstore.BlobStore$BlobStoreFileOutputStream.close(BlobStore.java:324)
>   at 
> org.apache.storm.blobstore.BlobStoreTest.testMultiple(BlobStoreTest.java:397)
>   at 
> org.apache.storm.blobstore.BlobStoreTest.testMultipleLocalFs(BlobStoreTest.java:168)
> testMultipleLocalFs(org.apache.storm.blobstore.BlobStoreTest)  Time elapsed: 
> 1.8 sec  <<< ERROR!
> java.io.IOException: Unable to delete file: 
> target\blob-store-test-19f8e973-7c1b-4638-8679-2eb1adcac396\blobs\571\data_other\data
>   at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:2279)
>   at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1653)
>   at org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1535)
>   at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:2270)
>   at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1653)
>   at org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1535)
>   at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:2270)
>   at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1653)
>   at org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1535)
>   at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:2270)
>   at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1653)
>   at org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1535)
>   at 
> org.apache.storm.blobstore.BlobStoreTest.cleanup(BlobStoreTest.java:74)
> testGetFileLength(org.apache.storm.blobstore.BlobStoreTest)  Time elapsed: 
> 0.067 sec  <<< ERROR!
> java.io.IOException: Unable to delete file: 
> target\blob-store-test-b6d39145-11ea-4aa7-ae30-28bda603fb3a\blobs\1017\data_test\data
>   at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:2279)
>   at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1653)
>   at org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1535)
>   at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:2270)
>   at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1653)
>   at org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1535)
>   at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:2270)
>   at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1653)
>   at org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1535)
>   at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:2270)
>   at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1653)
>   at org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1535)
>   at 
> org.apache.storm.blobstore.BlobStoreTest.cleanup(BlobStoreTest.java:74)
> testBasicLocalFs(org.apache.storm.blobstore.BlobStoreTest)  Time elapsed: 
> 0.124 sec  <<< ERROR!
> java.lang.RuntimeException: java.nio.file.D

[jira] [Commented] (STORM-1625) guava dependency is being shipped inside storm distribution

2016-03-14 Thread Robert Joseph Evans (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193911#comment-15193911
 ] 

Robert Joseph Evans commented on STORM-1625:


OK So I looked at how the code works for submission, and we should still be 
able to move the SQL off to its own location and not be a part of lib, I 
propose {{./sql-lib/}}.  When running {{storm sql}} we would add everything in 
that directory to the classpath. Additionally in 
{{StormSqlImpl.packageTopology}} instead of just dumping out the generated code 
we would also walk all of the jars in sql-lib and package them with it too.  
This also opens up the possibility of having UDFs because we could add the UDF 
jar to the classpath and to the newly packaged topology jar.

> guava dependency is being shipped inside storm distribution
> ---
>
> Key: STORM-1625
> URL: https://issues.apache.org/jira/browse/STORM-1625
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-core, storm-sql
>Reporter: Abhishek Agarwal
>Assignee: Abhishek Agarwal
>
> We shade guava classes inside storm-core so that client can work with any 
> version of guava without any clashes. However, storm-sql-core has a 
> transitive dependency on guava and thus the guava jar is still getting 
> shipped in lib folder. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (STORM-1626) HBaseBolt tuple counts too high in storm ui.

2016-03-14 Thread Priyank Shah (JIRA)
Priyank Shah created STORM-1626:
---

 Summary: HBaseBolt tuple counts too high in storm ui.
 Key: STORM-1626
 URL: https://issues.apache.org/jira/browse/STORM-1626
 Project: Apache Storm
  Issue Type: Bug
  Components: storm-hbase
Affects Versions: 1.0.0
Reporter: Priyank Shah
Priority: Blocker


When a storm topology has an HbaseBolt the storm ui numbers for executed and 
acked tuples seem to be too high for HbaseBolt component. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1625) guava dependency is being shipped inside storm distribution

2016-03-14 Thread P. Taylor Goetz (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193936#comment-15193936
 ] 

P. Taylor Goetz commented on STORM-1625:


I totally agree that the SQL dependencies should not be in lib. I feel this is 
still an experimental feature. For a short-term fix we can just remove the 
maven bits that put them there, and file a JIRA for a long-term solution.

> guava dependency is being shipped inside storm distribution
> ---
>
> Key: STORM-1625
> URL: https://issues.apache.org/jira/browse/STORM-1625
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-core, storm-sql
>Reporter: Abhishek Agarwal
>Assignee: Abhishek Agarwal
>
> We shade guava classes inside storm-core so that client can work with any 
> version of guava without any clashes. However, storm-sql-core has a 
> transitive dependency on guava and thus the guava jar is still getting 
> shipped in lib folder. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-1252] port backtype.storm.stats to java

2016-03-14 Thread revans2
Github user revans2 commented on the pull request:

https://github.com/apache/storm/pull/1147#issuecomment-196482345
  
@unsleepy22 sorry I had a minor personal emergency come up last Friday.  It 
is fixed now, but I am still trying to catch up on everything that happened 
Friday and over the weekend.  I'll try to get back to looking at this soon.  If 
I don't say anything in a day or so please ping me again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1252) port backtype.storm.stats to java

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193944#comment-15193944
 ] 

ASF GitHub Bot commented on STORM-1252:
---

Github user revans2 commented on the pull request:

https://github.com/apache/storm/pull/1147#issuecomment-196482345
  
@unsleepy22 sorry I had a minor personal emergency come up last Friday.  It 
is fixed now, but I am still trying to catch up on everything that happened 
Friday and over the weekend.  I'll try to get back to looking at this soon.  If 
I don't say anything in a day or so please ping me again.


> port backtype.storm.stats to java
> -
>
> Key: STORM-1252
> URL: https://issues.apache.org/jira/browse/STORM-1252
> Project: Apache Storm
>  Issue Type: New Feature
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Cody
>  Labels: java-migration, jstorm-merger
>
> clojure methods for getting/setting built-in statistics.  Mostly wrappers 
> around some java classes.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-1608] Fix stateful topology acking beha...

2016-03-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/1190


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1608] Fix stateful topology acking beha...

2016-03-14 Thread ptgoetz
Github user ptgoetz commented on the pull request:

https://github.com/apache/storm/pull/1190#issuecomment-196487801
  
Merged to master and 1.x-branch


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1608) Fix stateful topology acking behavior

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193987#comment-15193987
 ] 

ASF GitHub Bot commented on STORM-1608:
---

Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/1190


> Fix stateful topology acking behavior
> -
>
> Key: STORM-1608
> URL: https://issues.apache.org/jira/browse/STORM-1608
> Project: Apache Storm
>  Issue Type: Bug
>Affects Versions: 1.0.0, 2.0.0
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>
> Right now the acking is automatically taken care of for the non-stateful 
> bolts in a stateful topology. This leads to double acking if BaseRichBolts 
> are part of the topology. For the non-stateful bolts, its better to let the 
> bolt do the acking rather than automatically acking.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1608) Fix stateful topology acking behavior

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193988#comment-15193988
 ] 

ASF GitHub Bot commented on STORM-1608:
---

Github user ptgoetz commented on the pull request:

https://github.com/apache/storm/pull/1190#issuecomment-196487801
  
Merged to master and 1.x-branch


> Fix stateful topology acking behavior
> -
>
> Key: STORM-1608
> URL: https://issues.apache.org/jira/browse/STORM-1608
> Project: Apache Storm
>  Issue Type: Bug
>Affects Versions: 1.0.0, 2.0.0
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>
> Right now the acking is automatically taken care of for the non-stateful 
> bolts in a stateful topology. This leads to double acking if BaseRichBolts 
> are part of the topology. For the non-stateful bolts, its better to let the 
> bolt do the acking rather than automatically acking.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (STORM-1608) Fix stateful topology acking behavior

2016-03-14 Thread P. Taylor Goetz (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-1608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

P. Taylor Goetz resolved STORM-1608.

   Resolution: Fixed
Fix Version/s: 2.0.0
   1.0.0

> Fix stateful topology acking behavior
> -
>
> Key: STORM-1608
> URL: https://issues.apache.org/jira/browse/STORM-1608
> Project: Apache Storm
>  Issue Type: Bug
>Affects Versions: 1.0.0, 2.0.0
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
> Fix For: 1.0.0, 2.0.0
>
>
> Right now the acking is automatically taken care of for the non-stateful 
> bolts in a stateful topology. This leads to double acking if BaseRichBolts 
> are part of the topology. For the non-stateful bolts, its better to let the 
> bolt do the acking rather than automatically acking.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: !!! DO NOT MERGE !!! STORM-1617 preview !!! DO...

2016-03-14 Thread revans2
Github user revans2 closed the pull request at:

https://github.com/apache/storm/pull/1203


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: !!! DO NOT MERGE !!! STORM-1617 preview !!! DO...

2016-03-14 Thread revans2
Github user revans2 commented on the pull request:

https://github.com/apache/storm/pull/1203#issuecomment-196529163
  
Sounds good I will go ahead with doing all of this in subversion, and put 
up pull requests in git for release specific docs that we would copy to 
subversion when we do a release.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1617) storm.apache.org has no release specific documentation

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15194178#comment-15194178
 ] 

ASF GitHub Bot commented on STORM-1617:
---

Github user revans2 closed the pull request at:

https://github.com/apache/storm/pull/1203


> storm.apache.org has no release specific documentation
> --
>
> Key: STORM-1617
> URL: https://issues.apache.org/jira/browse/STORM-1617
> Project: Apache Storm
>  Issue Type: Bug
>  Components: asf-site
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
>
> Our current documentation on http://storm.apache.org/ has no place to put 
> release specific documentation.  Ever other project I know of has a little 
> bit of generic information, but most of the site is tied to a specific 
> release.  I would like to copy that model and I propose that the following 
> files be maintained only on the asf-site branch.  All other files will be 
> moved to a documentation directory on the individual branches, and a copy of 
> each, along with the generated javadocs, will be copied to a release specific 
> directory for each release.
> ./index.html
> ./releases.html (Combined downloads + documentation for each release)
> ./contribute/BYLAWS.md
> ./contribute/Contributing-to-Storm.md
> ./contribute/People.md (Perhaps just PMC for this)
> ./_posts
> ./news.html
> ./feed.xml
> ./getting-help.html
> ./LICENSE.html (Apache License)
> ./talksAndVideos.md
> Possibly also:
> ./about/deployment.md
> ./about/fault-tolerant.md
> ./about/free-and-open-source.md
> ./about/guarantees-data-processing.md
> ./about/integrates.md
> ./about/multi-language.md
> ./about/scalable.md
> ./about/simple-api.md
> ./about.md



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1617) storm.apache.org has no release specific documentation

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15194177#comment-15194177
 ] 

ASF GitHub Bot commented on STORM-1617:
---

Github user revans2 commented on the pull request:

https://github.com/apache/storm/pull/1203#issuecomment-196529163
  
Sounds good I will go ahead with doing all of this in subversion, and put 
up pull requests in git for release specific docs that we would copy to 
subversion when we do a release.


> storm.apache.org has no release specific documentation
> --
>
> Key: STORM-1617
> URL: https://issues.apache.org/jira/browse/STORM-1617
> Project: Apache Storm
>  Issue Type: Bug
>  Components: asf-site
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
>
> Our current documentation on http://storm.apache.org/ has no place to put 
> release specific documentation.  Ever other project I know of has a little 
> bit of generic information, but most of the site is tied to a specific 
> release.  I would like to copy that model and I propose that the following 
> files be maintained only on the asf-site branch.  All other files will be 
> moved to a documentation directory on the individual branches, and a copy of 
> each, along with the generated javadocs, will be copied to a release specific 
> directory for each release.
> ./index.html
> ./releases.html (Combined downloads + documentation for each release)
> ./contribute/BYLAWS.md
> ./contribute/Contributing-to-Storm.md
> ./contribute/People.md (Perhaps just PMC for this)
> ./_posts
> ./news.html
> ./feed.xml
> ./getting-help.html
> ./LICENSE.html (Apache License)
> ./talksAndVideos.md
> Possibly also:
> ./about/deployment.md
> ./about/fault-tolerant.md
> ./about/free-and-open-source.md
> ./about/guarantees-data-processing.md
> ./about/integrates.md
> ./about/multi-language.md
> ./about/scalable.md
> ./about/simple-api.md
> ./about.md



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-1616: Add RAS API for Trident

2016-03-14 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/1199#discussion_r56084445
  
--- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java ---
@@ -124,6 +124,31 @@ public Stream parallelismHint(int hint) {
 }
 
 /**
+ * Sets the CPU Load resource for the current node
+ */
+public Stream setCPULoad(Number load) {
--- End diff --

+1 arunmahadevan. I agree. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1616) Add RAS API for Trident

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15194253#comment-15194253
 ] 

ASF GitHub Bot commented on STORM-1616:
---

Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/1199#discussion_r56084445
  
--- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java ---
@@ -124,6 +124,31 @@ public Stream parallelismHint(int hint) {
 }
 
 /**
+ * Sets the CPU Load resource for the current node
+ */
+public Stream setCPULoad(Number load) {
--- End diff --

+1 arunmahadevan. I agree. 


> Add RAS API for Trident
> ---
>
> Key: STORM-1616
> URL: https://issues.apache.org/jira/browse/STORM-1616
> Project: Apache Storm
>  Issue Type: Bug
>Reporter: Kyle Nusbaum
>Assignee: Kyle Nusbaum
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-1616: Add RAS API for Trident

2016-03-14 Thread knusbaum
Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/1199#issuecomment-196548786
  
 arunmahadevan added a note 3 days ago

> Actually naming the operations wherever specific config needs to be 
applied looks cleaner and it separates out the streaming operations from the 
config specification.

>
```
TridentTopology topo = new TridentTopology();
TridentState wordCounts = 
topology.newStream("spout1", spout)
.name("spout1")
.each(new Fields("sentence"), new Split(), new Fields("word"))
.name("split_operation")
.groupBy(new Fields("word"))
.persistentAggregate(new MemorymapState.Factory(), new Count(), new 
Fields("count"))
.name("aggregate");
```
>And then
```
topology.setConfig("spout1", Config.cpuLoad(20).memoryLoad(1024))
 .setConfig("split_operation", 
Config.cpuLoad(20).memoryLoad(256))
 .setConfig("aggregate", Config.parallelismHint(6));
```
>The concern with directly adding each config api in Stream is that we 
might want to add more configs in future and then it would mess up the Stream 
api.'


This does not work, and would require an overhaul of the way naming works 
right now. First off, your example won't compile. `persistentAggregate` returns 
a `TridentState`, which doesn't have a `name` method. Okay, well we can move 
the name call up to just below the groupBy. It's a little confusing being right 
in the middle, but groupBy returns a `GroupedStream`, which has a name method. 
Unfortunately, you end up with all your bolts being named "b-*-aggregate" 
because 'GroupedStream' just renames the stream before it. In fact, except for 
differentiating between the spout and body, I don't see any existing way to 
give different sections different names, and doing so is not a simple change, 
and not one that is going to be backwards-compatible.

I agree with your point in principle, but how much do we want to change the 
existing API to accommodate it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1616) Add RAS API for Trident

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15194300#comment-15194300
 ] 

ASF GitHub Bot commented on STORM-1616:
---

Github user knusbaum commented on the pull request:

https://github.com/apache/storm/pull/1199#issuecomment-196548786
  
 arunmahadevan added a note 3 days ago

> Actually naming the operations wherever specific config needs to be 
applied looks cleaner and it separates out the streaming operations from the 
config specification.

>
```
TridentTopology topo = new TridentTopology();
TridentState wordCounts = 
topology.newStream("spout1", spout)
.name("spout1")
.each(new Fields("sentence"), new Split(), new Fields("word"))
.name("split_operation")
.groupBy(new Fields("word"))
.persistentAggregate(new MemorymapState.Factory(), new Count(), new 
Fields("count"))
.name("aggregate");
```
>And then
```
topology.setConfig("spout1", Config.cpuLoad(20).memoryLoad(1024))
 .setConfig("split_operation", 
Config.cpuLoad(20).memoryLoad(256))
 .setConfig("aggregate", Config.parallelismHint(6));
```
>The concern with directly adding each config api in Stream is that we 
might want to add more configs in future and then it would mess up the Stream 
api.'


This does not work, and would require an overhaul of the way naming works 
right now. First off, your example won't compile. `persistentAggregate` returns 
a `TridentState`, which doesn't have a `name` method. Okay, well we can move 
the name call up to just below the groupBy. It's a little confusing being right 
in the middle, but groupBy returns a `GroupedStream`, which has a name method. 
Unfortunately, you end up with all your bolts being named "b-*-aggregate" 
because 'GroupedStream' just renames the stream before it. In fact, except for 
differentiating between the spout and body, I don't see any existing way to 
give different sections different names, and doing so is not a simple change, 
and not one that is going to be backwards-compatible.

I agree with your point in principle, but how much do we want to change the 
existing API to accommodate it?


> Add RAS API for Trident
> ---
>
> Key: STORM-1616
> URL: https://issues.apache.org/jira/browse/STORM-1616
> Project: Apache Storm
>  Issue Type: Bug
>Reporter: Kyle Nusbaum
>Assignee: Kyle Nusbaum
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-1624] add maven central status in READM...

2016-03-14 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1212#issuecomment-196584158
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1624) add maven central status in README

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15194445#comment-15194445
 ] 

ASF GitHub Bot commented on STORM-1624:
---

Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1212#issuecomment-196584158
  
+1


> add maven central status in README
> --
>
> Key: STORM-1624
> URL: https://issues.apache.org/jira/browse/STORM-1624
> Project: Apache Storm
>  Issue Type: Improvement
>Reporter: Xin Wang
>Assignee: Xin Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-1616: Add RAS API for Trident

2016-03-14 Thread ptgoetz
Github user ptgoetz commented on the pull request:

https://github.com/apache/storm/pull/1199#issuecomment-196585455
  
I lean toward making this part of the Stream API, just like 
parallelismHint(). In both cases it is about requesting cluster resources, 
(I.e. Threads/CPU/Memory).

I'm strongly against (though open to new ideas) of making this a 
topology-level API as proposed earlier. That approach involves a lot of "magic 
strings" and feels to me like a violation of the Trident API. If there's a 
better way to do it, I'm open to it, but let's not use strings as identifiers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1616) Add RAS API for Trident

2016-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15194462#comment-15194462
 ] 

ASF GitHub Bot commented on STORM-1616:
---

Github user ptgoetz commented on the pull request:

https://github.com/apache/storm/pull/1199#issuecomment-196585455
  
I lean toward making this part of the Stream API, just like 
parallelismHint(). In both cases it is about requesting cluster resources, 
(I.e. Threads/CPU/Memory).

I'm strongly against (though open to new ideas) of making this a 
topology-level API as proposed earlier. That approach involves a lot of "magic 
strings" and feels to me like a violation of the Trident API. If there's a 
better way to do it, I'm open to it, but let's not use strings as identifiers.


> Add RAS API for Trident
> ---
>
> Key: STORM-1616
> URL: https://issues.apache.org/jira/browse/STORM-1616
> Project: Apache Storm
>  Issue Type: Bug
>Reporter: Kyle Nusbaum
>Assignee: Kyle Nusbaum
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >