[GitHub] storm issue #2783: [WIP] Make StormMetricsRegistry a regular class rather th...

2018-08-03 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/2783
  
I think it depends a lot on what kind of static method we're talking about. 
If the method is internal we can just change it and inject the metric, or even 
better make the method not static and inject the metrics or other dependencies 
via the constructor. This is sort of what happened with the memory tracking in 
Container.

I see your point for publically accessible static methods though. I'm not 
sure we can do anything to work around it there.


---


[GitHub] storm issue #2783: [WIP] Make StormMetricsRegistry a regular class rather th...

2018-08-03 Thread zd-project
Github user zd-project commented on the issue:

https://github.com/apache/storm/pull/2783
  
Additionally, it still seems to be not ideal that if we want to measure 
things inside a static method, we'd have to pass the metric in as a parameter. 


---


[GitHub] storm issue #2783: [WIP] Make StormMetricsRegistry a regular class rather th...

2018-08-03 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/2783
  
@zd-project You are right that if someone wants to add metrics to a 
component, they will have to figure out how to get the registry injected. I 
can't say up front how hard that will be, except to note that it wasn't too bad 
to make these changes. 

For the specific case of the Container, I put the metrics in the 
ContainerMemoryTracker next to the fields they're gauges for. This is primarily 
because Container gets created a lot of times, and I didn't want to call the 
`gauge` method every time the constructor is invoked. In this case I chose to 
move the gauges to a different class with a different lifecycle from Container, 
but since the `gauge` method uses `getOrAdd` there probably wouldn't be 
anything wrong with registering the metrics every time a Container is created 
instead.

If I were adding new metrics to Container, I would either make a metrics 
container object like SlotMetrics that is common to all Containers and pass 
that in, or pass in the registry and just invoke the registration every time 
the constructor is invoked. Unless there's a big performance penalty to using 
`getOrAdd`, I would think that there wouldn't be much difference.


---


[GitHub] storm pull request #2771: STORM-3157: General improvement to StormMetricsReg...

2018-08-03 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2771#discussion_r207672905
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java ---
@@ -12,28 +12,30 @@
 
 package org.apache.storm.metric;
 
+import com.codahale.metrics.ExponentiallyDecayingReservoir;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Metric;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.MetricSet;
 import com.codahale.metrics.Reservoir;
+
 import java.util.Map;
 import java.util.concurrent.Callable;
+
+import com.codahale.metrics.Timer;
+import org.apache.commons.lang.StringUtils;
 import org.apache.storm.daemon.metrics.MetricsUtils;
 import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@SuppressWarnings("unchecked")
-public class StormMetricsRegistry {
-private static final MetricRegistry DEFAULT_REGISTRY = new 
MetricRegistry();
+public class StormMetricsRegistry extends MetricRegistry {
--- End diff --

Yes, I agree that for the non-MetricSet metrics, we can just use the 
getOrAdd wrappers. If we don't need MetricSet with a non-static registry, we 
should be good if we merge the changes in 
https://github.com/apache/storm/pull/2783. 

I agree that we should upgrade, but versions past 4.x have removed the 
metrics-ganglia module. I'm not sure if it's been spun off somewhere, or if 
it's just been deleted, but I didn't want to start removing stuff related to 
Ganglia in https://github.com/apache/storm/pull/2783 as well. If we want to 
upgrade past 3.1 I think we should do it in a separate PR.


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-03 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207671937
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -62,6 +62,7 @@
 private OutputCollector collector;
 private TupleToKafkaMapper mapper;
 private KafkaTopicSelector topicSelector;
+private Callback providedCallback;
--- End diff --

That looks good to me. I'm also not sure about the collector, but I'd maybe 
lean toward not including it. The callback won't have access to incoming 
tuples, so the only use would be if someone wanted the callback to emit a new 
unanchored tuple.

I wouldn't worry about people misusing the prepare method. They can just as 
easily override the bolt's prepare method. 


---


[GitHub] storm issue #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-03 Thread zd-project
Github user zd-project commented on the issue:

https://github.com/apache/storm/pull/2754
  
The metrics of IO exceptions on logviewer is kind of fuzzy as I go through 
the call stack manually and eyeballed all thrown exceptions. I'm not sure if 
it's complete or if it will over-count. It'd be great if you can help review it 
especially. 


---


[GitHub] storm pull request #2771: STORM-3157: General improvement to StormMetricsReg...

2018-08-03 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2771#discussion_r207665264
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java ---
@@ -50,19 +61,19 @@ public static void startMetricsReporters(Map topoConf) {
 }
 }
 
-private static  T register(final String name, T 
metric) {
-T ret;
+@Override
+//This is more similar to super#getOrAdd than super#register
+public  T register(final String name, T metric) 
throws IllegalArgumentException {
--- End diff --

It is fine. But I would like to see more javadoc for this method.


---


[GitHub] storm pull request #2771: STORM-3157: General improvement to StormMetricsReg...

2018-08-03 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2771#discussion_r207663344
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java ---
@@ -50,19 +61,19 @@ public static void startMetricsReporters(Map topoConf) {
 }
 }
 
-private static  T register(final String name, T 
metric) {
-T ret;
+@Override
+//This is more similar to super#getOrAdd than super#register
+public  T register(final String name, T metric) 
throws IllegalArgumentException {
--- End diff --

I am okay with it this way for now.


---


[GitHub] storm pull request #2752: Storm 1311 Migration of UI from clj to Java

2018-08-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2752


---


[GitHub] storm issue #2752: Storm 1311 Migration of UI from clj to Java

2018-08-03 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/2752
  
Please squash the commits.


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-03 Thread dfdemar
Github user dfdemar commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207649108
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -62,6 +62,7 @@
 private OutputCollector collector;
 private TupleToKafkaMapper mapper;
 private KafkaTopicSelector topicSelector;
+private Callback providedCallback;
--- End diff --

What about something like this? Not sure if `OutputCollector` is necessary 
or not.

```java
public interface PreparedCallback extends Callback, Serializable {
void prepare(Map topoConf, TopologyContext context, 
OutputCollector outputCollector);
}
```

So if the `PreparedCallback` is not null, then KafkaBolt would call 
`.prepare()` during `KafkaBolt.prepare()`.

The one concern I have is that it might be open for abuse by tempting 
people to use it to inject logic into the bolt preparation that's unrelated to 
Kafka publishing.


---


[GitHub] storm issue #2710: STORM-3099: Extend metrics on supervisor, workers and DRP...

2018-08-03 Thread zd-project
Github user zd-project commented on the issue:

https://github.com/apache/storm/pull/2710
  
Should I squash to one per daemon?


---


[GitHub] storm pull request #2710: STORM-3099: Extend metrics on supervisor, workers ...

2018-08-03 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2710#discussion_r207645153
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java
 ---
@@ -28,11 +29,14 @@
 import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ShellUtils;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ClientSupervisorUtils {
+public static final Meter numWorkerLaunchExceptions = 
ShellUtils.numShellExceptions;
--- End diff --

Logged in Jira


---


[GitHub] storm issue #2783: [WIP] Make StormMetricsRegistry a regular class rather th...

2018-08-03 Thread zd-project
Github user zd-project commented on the issue:

https://github.com/apache/storm/pull/2783
  
Extensibility and Centralized management would be my concerns. I think this 
PR has improved on the centralized management of metrics. For example I think 
the slotMetrics class is a great example of this (it's actually a very similar 
idea to the MetricSet I've put in). It's easier to tell this way what we have 
been tracking for a certain daemon and how we're tracking it.

But I'm not quite sure about the extensibility part, i.e., what I should do 
if I want to have metrics in a new component or one that hasn't been injected a 
registry yet. For example, in my PR I added a few metrics in Container, but I 
don't think I see a way to add them here since Container doesn't have access to 
StormMetricsRegistry. We might need to do some refactoring before we can 
actually add in metrics.


---


[GitHub] storm pull request #2771: STORM-3157: General improvement to StormMetricsReg...

2018-08-03 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2771#discussion_r207640687
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java ---
@@ -12,28 +12,30 @@
 
 package org.apache.storm.metric;
 
+import com.codahale.metrics.ExponentiallyDecayingReservoir;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Metric;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.MetricSet;
 import com.codahale.metrics.Reservoir;
+
 import java.util.Map;
 import java.util.concurrent.Callable;
+
+import com.codahale.metrics.Timer;
+import org.apache.commons.lang.StringUtils;
 import org.apache.storm.daemon.metrics.MetricsUtils;
 import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@SuppressWarnings("unchecked")
-public class StormMetricsRegistry {
-private static final MetricRegistry DEFAULT_REGISTRY = new 
MetricRegistry();
+public class StormMetricsRegistry extends MetricRegistry {
--- End diff --

Our customization upon MetricsRegistry is actually very similar to the 
wrapping methods of `#getOrAdd`, such as `gauge`, `timer`, `meter`, and 
`histogram`. They do not have a way to eliminate double registration of 
MetricSet though, although we could avoid this altogether with a non-static 
registry. 

In addition, I think we should probably upgrading to a newer version of 
Dropwizard, since current version (3.1.0) is about to be EOL. Their 4.x has a 
lot of improvement and provides more features on top of Java 8.


---


[GitHub] storm pull request #2771: STORM-3157: General improvement to StormMetricsReg...

2018-08-03 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2771#discussion_r207638261
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java ---
@@ -12,28 +12,30 @@
 
 package org.apache.storm.metric;
 
+import com.codahale.metrics.ExponentiallyDecayingReservoir;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Metric;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.MetricSet;
 import com.codahale.metrics.Reservoir;
+
 import java.util.Map;
 import java.util.concurrent.Callable;
+
+import com.codahale.metrics.Timer;
+import org.apache.commons.lang.StringUtils;
 import org.apache.storm.daemon.metrics.MetricsUtils;
 import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@SuppressWarnings("unchecked")
-public class StormMetricsRegistry {
-private static final MetricRegistry DEFAULT_REGISTRY = new 
MetricRegistry();
+public class StormMetricsRegistry extends MetricRegistry {
--- End diff --

I'll take a look at this


---


[GitHub] storm pull request #2771: STORM-3157: General improvement to StormMetricsReg...

2018-08-03 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2771#discussion_r207636496
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java ---
@@ -50,19 +61,19 @@ public static void startMetricsReporters(Map topoConf) {
 }
 }
 
-private static  T register(final String name, T 
metric) {
-T ret;
+@Override
+//This is more similar to super#getOrAdd than super#register
+public  T register(final String name, T metric) 
throws IllegalArgumentException {
--- End diff --

I added a test for registerMetricSet and unregisterMetricSet in #2754 and 
#2764 to show that this method has solved the issue of double registration. See 
https://github.com/apache/storm/pull/2754/commits/597c6bb2d41a7aa0d25c6aab9201b451f6b1eaf1

I don't know if we should move the change to `registerMetricSet` and 
`unregisterMetricSet` back to this PR, but the lack of them seems to be 
confusing people of the purpose of this change. @Ethanlm @srdo @revans2 


---


[GitHub] storm pull request #2771: STORM-3157: General improvement to StormMetricsReg...

2018-08-03 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2771#discussion_r207601720
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java ---
@@ -12,28 +12,30 @@
 
 package org.apache.storm.metric;
 
+import com.codahale.metrics.ExponentiallyDecayingReservoir;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Metric;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.MetricSet;
 import com.codahale.metrics.Reservoir;
+
 import java.util.Map;
 import java.util.concurrent.Callable;
+
+import com.codahale.metrics.Timer;
+import org.apache.commons.lang.StringUtils;
 import org.apache.storm.daemon.metrics.MetricsUtils;
 import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@SuppressWarnings("unchecked")
-public class StormMetricsRegistry {
-private static final MetricRegistry DEFAULT_REGISTRY = new 
MetricRegistry();
+public class StormMetricsRegistry extends MetricRegistry {
--- End diff --

I think we can make the registry non-static without it being a huge hassle 
https://github.com/apache/storm/pull/2783. I'll rebase it once 
https://github.com/apache/storm/pull/2752 goes in, and hopefully we can use it 
to resolve https://github.com/apache/storm/pull/2714.

Regarding the subclassing, I'm wondering if we should instead go ask the 
Dropwizard guys whether the inability to use `getOrAdd` for MetricSets is 
intended, and if not whether they'd be open to adding a method to 
MetricRegistry to allow it. If we're hitting this issue, other people probably 
are too, and I'd much rather fix it in the library than try to hack around it 
here.

What do you think @zd-project?


---


[GitHub] storm pull request #2771: STORM-3157: General improvement to StormMetricsReg...

2018-08-03 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2771#discussion_r207599113
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java ---
@@ -12,28 +12,30 @@
 
 package org.apache.storm.metric;
 
+import com.codahale.metrics.ExponentiallyDecayingReservoir;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Metric;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.MetricSet;
 import com.codahale.metrics.Reservoir;
+
 import java.util.Map;
 import java.util.concurrent.Callable;
+
+import com.codahale.metrics.Timer;
+import org.apache.commons.lang.StringUtils;
 import org.apache.storm.daemon.metrics.MetricsUtils;
 import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@SuppressWarnings("unchecked")
-public class StormMetricsRegistry {
-private static final MetricRegistry DEFAULT_REGISTRY = new 
MetricRegistry();
+public class StormMetricsRegistry extends MetricRegistry {
--- End diff --

The reason I changed the `static register()` to instance `register()` 
method is stated here. Also I should point out that because 
StormMetricsRegistry is a static singleton, it will persist throughout each 
UnitTest. So if a UnitTest class has multiple tests in it, all metrics will 
actually be registered multiple times. We either have to change all unit tests 
to have set up and tear down, or we have to completely revamp the 
StormMetricsRegistry class to make it non-static. As I don't have that much 
time to do either of them, I came up with this walkaround. @srdo @revans2 
@Ethanlm 


---


[GitHub] storm pull request #2788: STORM-3170: Fixed bug to eliminate invalid file de...

2018-08-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2788


---


[GitHub] storm pull request #2771: STORM-3157: General improvement to StormMetricsReg...

2018-08-03 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2771#discussion_r207594931
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java ---
@@ -50,19 +61,19 @@ public static void startMetricsReporters(Map topoConf) {
 }
 }
 
-private static  T register(final String name, T 
metric) {
-T ret;
+@Override
+//This is more similar to super#getOrAdd than super#register
+public  T register(final String name, T metric) 
throws IllegalArgumentException {
--- End diff --

if the metric is MetricSet, it will be having the same issue with the old 
code.


---


[GitHub] storm issue #2788: STORM-3170: Fixed bug to eliminate invalid file deletion

2018-08-03 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/2788
  
Still +1


---


[GitHub] storm pull request #2788: STORM-3170: Fixed bug to eliminate invalid file de...

2018-08-03 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2788#discussion_r207587787
  
--- Diff: 
storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/testsupport/MockRemovableFileBuilder.java
 ---
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.
+ * See the NOTICE file distributed with this work for additional 
information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0 
(the "License");
+ * you may not use this file except in compliance with the License.  You 
may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an "AS IS" 
BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and 
limitations under the License.
+ */
+
+package org.apache.storm.daemon.logviewer.testsupport;
+
+import org.mockito.Mockito;
+
+import java.io.File;
+
+public class MockRemovableFileBuilder extends MockFileBuilder {
+@Override
+public File build() {
+File mockFile = super.build();
+Mockito.when(mockFile.delete()).thenReturn(true);
--- End diff --

Okay I'll just file a minor jira then


---


[GitHub] storm pull request #2788: STORM-3170: Fixed bug to eliminate invalid file de...

2018-08-03 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2788#discussion_r207587402
  
--- Diff: 
storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/testsupport/MockRemovableFileBuilder.java
 ---
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.
+ * See the NOTICE file distributed with this work for additional 
information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0 
(the "License");
+ * you may not use this file except in compliance with the License.  You 
may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an "AS IS" 
BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and 
limitations under the License.
+ */
+
+package org.apache.storm.daemon.logviewer.testsupport;
+
+import org.mockito.Mockito;
+
+import java.io.File;
+
+public class MockRemovableFileBuilder extends MockFileBuilder {
+@Override
+public File build() {
+File mockFile = super.build();
+Mockito.when(mockFile.delete()).thenReturn(true);
--- End diff --

No, it's just me nitpicking. We can always replace it if it becomes an 
issue to use in tests.


---


[GitHub] storm pull request #2787: STORM-3169: Correctly convert configured minutes t...

2018-08-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2787


---


[GitHub] storm pull request #2785: STORM-3167: Copy map before returning in FakeMetri...

2018-08-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2785


---


[GitHub] storm pull request #2784: STORM-3166: Make Utils.threadDump account for thre...

2018-08-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2784


---


[GitHub] storm pull request #2788: STORM-3170: Fixed bug to eliminate invalid file de...

2018-08-03 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2788#discussion_r207586437
  
--- Diff: 
storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/testsupport/MockRemovableFileBuilder.java
 ---
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.
+ * See the NOTICE file distributed with this work for additional 
information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0 
(the "License");
+ * you may not use this file except in compliance with the License.  You 
may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an "AS IS" 
BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and 
limitations under the License.
+ */
+
+package org.apache.storm.daemon.logviewer.testsupport;
+
+import org.mockito.Mockito;
+
+import java.io.File;
+
+public class MockRemovableFileBuilder extends MockFileBuilder {
+@Override
+public File build() {
+File mockFile = super.build();
+Mockito.when(mockFile.delete()).thenReturn(true);
--- End diff --

Should I still concern about this issue then? @revans2 @srdo 


---


[GitHub] storm issue #2787: STORM-3169: Correctly convert configured minutes to milli...

2018-08-03 Thread zd-project
Github user zd-project commented on the issue:

https://github.com/apache/storm/pull/2787
  
Squashed.


---


[GitHub] storm issue #2788: STORM-3170: Fixed bug to eliminate invalid file deletion

2018-08-03 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/2788
  
+1, thanks for fixing the issue with nondeleteable files. The solution 
looks good. Please squash and we can merge.


---


[GitHub] storm issue #2787: STORM-3169: Correctly convert configured minutes to milli...

2018-08-03 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/2787
  
+1, thanks for the quick fix. Please squash and we can merge.


---


[GitHub] storm pull request #2786: STORM-3168 prevent AsyncLocalizer cleanup from cra...

2018-08-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2786


---


[GitHub] storm issue #2710: STORM-3099: Extend metrics on supervisor, workers and DRP...

2018-08-03 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/2710
  
Also could we squash the commits some?


---


[GitHub] storm pull request #2710: STORM-3099: Extend metrics on supervisor, workers ...

2018-08-03 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2710#discussion_r207578433
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java ---
@@ -1147,8 +1162,19 @@ public DynamicState 
withPendingLocalization(Future pendingDownload) {
  */
 public DynamicState withState(final MachineState state) {
 long newStartTime = Time.currentTimeMillis();
+//TODO: potential lost metrics due to timing accuracy (Timer 
only tracks one call per millisecond)
--- End diff --

Please don't leave TODOs int he code.  Either fix it, file a follow on JIRA 
to fix it, or accept it and just have it be a comment and not a TODO.


---


[GitHub] storm pull request #2710: STORM-3099: Extend metrics on supervisor, workers ...

2018-08-03 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2710#discussion_r207576736
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java
 ---
@@ -28,11 +29,14 @@
 import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ShellUtils;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ClientSupervisorUtils {
+public static final Meter numWorkerLaunchExceptions = 
ShellUtils.numShellExceptions;
--- End diff --

This all feels a bit too confusing to me.

`ShellUtils.numShellExceptions` is static and pulled in from multiple 
different locations, but only registered once in the supervisor. I personally 
would rather see it where `ClientSupervisorUtils` registers the Meter, as it is 
tied to the supervisor having it do it here is fine.

For ShellUtils, I would like to see it not have a static meter, but instead 
optionally pass a meter in when calling run, or perhaps optionally include it 
in the constructor. 


---


[GitHub] storm issue #2771: STORM-3157: General improvement to StormMetricsRegistry

2018-08-03 Thread zd-project
Github user zd-project commented on the issue:

https://github.com/apache/storm/pull/2771
  
`name()` has been removed from this PR.


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-03 Thread dfdemar
Github user dfdemar commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207560866
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -129,6 +140,27 @@ public void prepare(Map topoConf, 
TopologyContext context, Outpu
 return new KafkaProducer<>(props);
 }
 
+/**
+ * Make the producer Callback. Using this Callback will also execute 
the user defined Callback, if provided.
+ */
+protected Callback mkProducerCallback(final Tuple input) {
--- End diff --

Fair point. I'll change it to `private`.


---


[GitHub] storm pull request #2752: Storm 1311 Migration of UI from clj to Java

2018-08-03 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2752#discussion_r207554122
  
--- Diff: 
storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java ---
@@ -0,0 +1,1941 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF)
+ * under one or more contributor license agreements.
+ * See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance 
with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an "AS IS" 
BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.daemon.ui;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import javax.servlet.DispatcherType;
+import javax.servlet.Servlet;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.SecurityContext;
+
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.BoltAggregateStats;
+import org.apache.storm.generated.ClusterSummary;
+import org.apache.storm.generated.CommonAggregateStats;
+import org.apache.storm.generated.ComponentAggregateStats;
+import org.apache.storm.generated.ComponentPageInfo;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.ExecutorSummary;
+import org.apache.storm.generated.GetInfoOptions;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.KillOptions;
+import org.apache.storm.generated.LogConfig;
+import org.apache.storm.generated.LogLevel;
+import org.apache.storm.generated.LogLevelAction;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.generated.NimbusSummary;
+import org.apache.storm.generated.NodeInfo;
+import org.apache.storm.generated.NumErrorsChoice;
+import org.apache.storm.generated.OwnerResourceSummary;
+import org.apache.storm.generated.ProfileAction;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.RebalanceOptions;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.SupervisorPageInfo;
+import org.apache.storm.generated.SupervisorSummary;
+import org.apache.storm.generated.TopologyHistoryInfo;
+import org.apache.storm.generated.TopologyInfo;
+import org.apache.storm.generated.TopologyPageInfo;
+import org.apache.storm.generated.TopologyStats;
+import org.apache.storm.generated.TopologySummary;
+import org.apache.storm.generated.WorkerSummary;
+import org.apache.storm.logging.filters.AccessLoggingFilter;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.thrift.TException;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.TopologySpoutLag;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.VersionInfo;
+import org.apache.storm.utils.WebAppUtils;
+import org.eclipse.jetty.http.HttpStatus;
+import org.eclipse.jetty.http.HttpVersion;
+import org.eclipse.jetty.server.HttpConfiguration;
+import org.eclipse.jetty.server.HttpConnectionFactory;
+import org.eclipse.jetty.server.SecureRequestCustomizer;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.SslConnectionFactory;
+import org.eclipse.jetty.servlet.FilterHolder;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.servlets.CrossOriginFilter;
+import org.eclipse.jetty.util.ssl.SslContextFactory;

[GitHub] storm pull request #2752: Storm 1311 Migration of UI from clj to Java

2018-08-03 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2752#discussion_r207551893
  
--- Diff: 
storm-webapp/src/main/java/org/apache/storm/daemon/ui/filters/AuthorizedUserFilter.java
 ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.ui.filters;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.security.Principal;
+import java.util.Map;
+import javax.ws.rs.container.ContainerRequestContext;
+import javax.ws.rs.container.ContainerRequestFilter;
+import javax.ws.rs.container.ResourceInfo;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.ext.Provider;
+
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.ui.resources.NimbusOp;
+import org.apache.storm.security.auth.IAuthorizer;
+import org.apache.storm.security.auth.ReqContext;
+import org.apache.storm.thrift.TException;
+import org.apache.storm.utils.NimbusClient;
+import org.apache.storm.utils.Utils;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Provider
+public class AuthorizedUserFilter implements ContainerRequestFilter {
+
+public static final Logger LOG = 
LoggerFactory.getLogger(AuthorizedUserFilter.class);
+public static Map conf = Utils.readStormConfig();
+public static IAuthorizer uiImpersonationHandler;
+public static IAuthorizer uiAclHandler;
+
+@Context private ResourceInfo resourceInfo;
+
+static {
+try {
+uiImpersonationHandler = StormCommon.mkAuthorizationHandler(
+(String) 
conf.get(DaemonConfig.NIMBUS_IMPERSONATION_AUTHORIZER), conf);
+uiAclHandler = StormCommon.mkAuthorizationHandler(
+(String) conf.get(DaemonConfig.NIMBUS_AUTHORIZER), 
conf);
+} catch (IllegalAccessException | InstantiationException | 
ClassNotFoundException e) {
+LOG.error("Error initializing AuthorizedUserFilter: ", e);
+throw new RuntimeException(e);
+}
+}
+
+@Override
+public void filter(ContainerRequestContext containerRequestContext) 
throws IOException {
+NimbusOp annotation = 
resourceInfo.getResourceMethod().getAnnotation(NimbusOp.class);
+if (annotation == null) {
+return;
+}
+String op = annotation.value();
+if (op == null) {
+return;
+}
+
+Map topoConf = null;
+if 
(containerRequestContext.getUriInfo().getPathParameters().containsKey("id")) {
+NimbusClient nimbusClient = 
NimbusClient.getConfiguredClient(Utils.readStormConfig());
+try {
+topoConf = (Map) 
JSONValue.parse(nimbusClient.getClient().getTopologyConf(
+
containerRequestContext.getUriInfo().getPathParameters().get("id").get(0)));
+} catch (TException e) {
+e.printStackTrace();
--- End diff --

This appears to be the part that is causing the issues.  Printing the 
stacktrace is not logging it, so it will not show up where we want it.  And if 
we are handling it properly it will not matter.  An IOException is not going to 
be mapper properly into the result we want.  Also all errors we don't expect 
500 errors will be returned as HTML.  Lets have a generic mapper that can take 
any Throwable and turn it into a JSON response?  Also at the same time can we 
change this IOException so it has e as the cause, and in the Generic exception 
mapper can we have it look for a cause of Authroized, etc and map it to the 
proper response?


---


[GitHub] storm issue #2752: Storm 1311 Migration of UI from clj to Java

2018-08-03 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/2752
  
I may have found out more.  In the logs I see 

```
Caused by: java.io.IOException: Unable to fetch topo conf for topo id 
wc-1-1533264135
at 
org.apache.storm.daemon.ui.filters.AuthorizedUserFilter.filter(AuthorizedUserFilter.java:85)
 ~[storm-webapp-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at 
org.glassfish.jersey.server.ContainerFilteringStage.apply(ContainerFilteringStage.java:132)
 ~[jersey-server-2.27.jar:?]
```

So it looks like the authorizer is throwing an IOException which is causing 
the 500 error


---


[GitHub] storm issue #2752: Storm 1311 Migration of UI from clj to Java

2018-08-03 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/2752
  
I am seeing these when I run DRPC and the logviewer.  They appear to still 
work so this is minor, but it would be nice to fix them in a follow on JIRA if 
you cannot do it quickly now.

```
WARNING: A provider 
org.apache.storm.daemon.logviewer.webapp.LogviewerResource registered in SERVER 
runtime does not implement any provider interfaces applicable in the SERVER 
runtime. Due to constraint configuration problems the provider 
org.apache.storm.daemon.logviewer.webapp.LogviewerResource will be ignored.
```

```
WARNING: A provider org.apache.storm.daemon.drpc.webapp.DRPCResource 
registered in SERVER runtime does not implement any provider interfaces 
applicable in the SERVER runtime. Due to constraint configuration problems the 
provider org.apache.storm.daemon.drpc.webapp.DRPCResource will be ignored.
```

Also I found that the exception mapping does not appear to be working.  If 
you go to the topology page for a topology that is not up (i.e. 
http://localhost:8080/topology.html?id=wc-1-1533264135 ) I see a stack trace 
for the NotAliveException being printed out to stdout or stderr (not sure which)

```
NotAliveException(msg:wc-1-1533264135)
at 
org.apache.storm.generated.Nimbus$getTopologyConf_result$getTopologyConf_resultStandardScheme.read(Nimbus.java:45081)
at 
org.apache.storm.generated.Nimbus$getTopologyConf_result$getTopologyConf_resultStandardScheme.read(Nimbus.java:45059)
at 
org.apache.storm.generated.Nimbus$getTopologyConf_result.read(Nimbus.java:44993)
...
```

But the request is returning an HTML response

```



Error 500 Request failed.

HTTP ERROR 500
Problem accessing /api/v1/topology/wc-1-1533264135. Reason:
Request failed.http://eclipse.org/jetty;>Powered by Jetty:// 9.4.7.v20170914



```


---


[GitHub] storm pull request #2752: Storm 1311 Migration of UI from clj to Java

2018-08-03 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2752#discussion_r207542398
  
--- Diff: 
storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/webapp/LogviewerResource.java
 ---
@@ -137,7 +137,7 @@ public Response searchLogs(@Context HttpServletRequest 
request) throws IOExcepti
 String user = httpCredsHandler.getUserName(request);
 String topologyId = request.getParameter("topoId");
 String portStr = request.getParameter("port");
-String callback = request.getParameter("callback");
+String callback = request.getParameter("callbackParameterName");
--- End diff --

So why are we changing `/searchLogs?callback=bar` to 
`/searchLogs?callbackParameterName=bar`?  If that is what we are doing it is a 
regression, if not I am really missing something.


---


[GitHub] storm pull request #2788: STORM-3170: Fixed bug to eliminate invalid file de...

2018-08-03 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2788#discussion_r207539609
  
--- Diff: 
storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/testsupport/MockRemovableFileBuilder.java
 ---
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.
+ * See the NOTICE file distributed with this work for additional 
information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0 
(the "License");
+ * you may not use this file except in compliance with the License.  You 
may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an "AS IS" 
BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and 
limitations under the License.
+ */
+
+package org.apache.storm.daemon.logviewer.testsupport;
+
+import org.mockito.Mockito;
+
+import java.io.File;
+
+public class MockRemovableFileBuilder extends MockFileBuilder {
+@Override
+public File build() {
+File mockFile = super.build();
+Mockito.when(mockFile.delete()).thenReturn(true);
--- End diff --

@zd-project you can control it more fully, but it gets a little more 
complicated.  Not a big deal but if it is causing problems it can be done.


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-03 Thread dfdemar
Github user dfdemar commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207537361
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -62,6 +62,7 @@
 private OutputCollector collector;
 private TupleToKafkaMapper mapper;
 private KafkaTopicSelector topicSelector;
+private Callback providedCallback;
--- End diff --

Ah, good point. The workaround I was using prior to this was all done after 
serialization so this wasn't something I had to worry about. Let me think about 
what that interface should look like and I'll come up with something.


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-03 Thread dfdemar
Github user dfdemar commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207534991
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -129,6 +140,27 @@ public void prepare(Map topoConf, 
TopologyContext context, Outpu
 return new KafkaProducer<>(props);
 }
 
+/**
+ * Make the producer Callback. Using this Callback will also execute 
the user defined Callback, if provided.
+ */
+protected Callback mkProducerCallback(final Tuple input) {
--- End diff --

I can change to `private`.


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-03 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207526335
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
 ---
@@ -37,38 +39,48 @@
 import org.apache.storm.tuple.Tuple;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
+import org.mockito.internal.util.MockUtil;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class KafkaBoltTest {
 private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBoltTest.class);
-
+
 @SuppressWarnings({ "unchecked", "serial" })
-@Test
-public void testSimple() {
-final KafkaProducer producer = 
mock(KafkaProducer.class);
-when(producer.send(any(), any())).thenAnswer(new Answer() {
+private  KafkaBolt makeBolt(KafkaProducer 
mockProducer) {
+Validate.isTrue(MockUtil.isMock(mockProducer), "KafkaProducer 
should be a mock object.");
+when(mockProducer.send(any(), any())).thenAnswer(new 
Answer() {
 @Override
 public Object answer(InvocationOnMock invocation) throws 
Throwable {
 Callback c = (Callback)invocation.getArguments()[1];
 c.onCompletion(null, null);
 return null;
 }
 });
-KafkaBolt bolt = new KafkaBolt() {
+KafkaBolt bolt = new KafkaBolt() {
 @Override
-protected KafkaProducer mkProducer(Properties 
props) {
-return producer;
+protected KafkaProducer mkProducer(Properties props) {
+return mockProducer;
 }
 };
 bolt.withTopicSelector("MY_TOPIC");
-
+
+return bolt;
+}
+
+@SuppressWarnings({ "unchecked", "serial" })
+@Test
+public void testSimple() {
+final KafkaProducer producer = 
mock(KafkaProducer.class);
--- End diff --

That would be fine.


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-03 Thread dfdemar
Github user dfdemar commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207524658
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
 ---
@@ -37,38 +39,48 @@
 import org.apache.storm.tuple.Tuple;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
+import org.mockito.internal.util.MockUtil;
--- End diff --

Nice, didn't know about that one.


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-03 Thread dfdemar
Github user dfdemar commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207524275
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
 ---
@@ -37,38 +39,48 @@
 import org.apache.storm.tuple.Tuple;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
+import org.mockito.internal.util.MockUtil;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class KafkaBoltTest {
 private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBoltTest.class);
-
+
 @SuppressWarnings({ "unchecked", "serial" })
-@Test
-public void testSimple() {
-final KafkaProducer producer = 
mock(KafkaProducer.class);
-when(producer.send(any(), any())).thenAnswer(new Answer() {
+private  KafkaBolt makeBolt(KafkaProducer 
mockProducer) {
+Validate.isTrue(MockUtil.isMock(mockProducer), "KafkaProducer 
should be a mock object.");
+when(mockProducer.send(any(), any())).thenAnswer(new 
Answer() {
 @Override
 public Object answer(InvocationOnMock invocation) throws 
Throwable {
 Callback c = (Callback)invocation.getArguments()[1];
 c.onCompletion(null, null);
 return null;
 }
 });
-KafkaBolt bolt = new KafkaBolt() {
+KafkaBolt bolt = new KafkaBolt() {
 @Override
-protected KafkaProducer mkProducer(Properties 
props) {
-return producer;
+protected KafkaProducer mkProducer(Properties props) {
+return mockProducer;
 }
 };
 bolt.withTopicSelector("MY_TOPIC");
-
+
+return bolt;
+}
+
+@SuppressWarnings({ "unchecked", "serial" })
+@Test
+public void testSimple() {
+final KafkaProducer producer = 
mock(KafkaProducer.class);
--- End diff --

That's a possibility, though `KafkaBolt` has `KafkaProducer` as a private 
field so `MockProducer` can't be used. That could be fixed by changing that 
field to a `Producer` interface instead.


---


[GitHub] storm issue #2791: STORM-3176: KafkaSpout commit offset occurs CommitFailedE...

2018-08-03 Thread wangzzu
Github user wangzzu commented on the issue:

https://github.com/apache/storm/pull/2791
  
@srdo I found that we are using the deprecated Subscription subtypes 
actually, thx.


---


[GitHub] storm issue #2791: STORM-3176: KafkaSpout commit offset occurs CommitFailedE...

2018-08-03 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/2791
  
@wangzzu Yes, but it should also have been fixed in 1.1.2. Please make sure 
you're not using one of the deprecated Subscription subtypes, e.g. 
https://github.com/apache/storm/blob/v1.1.2/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-03 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207511238
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -62,6 +62,7 @@
 private OutputCollector collector;
 private TupleToKafkaMapper mapper;
 private KafkaTopicSelector topicSelector;
+private Callback providedCallback;
--- End diff --

I think we need to define our own interface for this. Callback isn't 
serializable, so Storm won't be able to transfer it from the Nimbus submitter 
to the worker JVM.


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-03 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207515186
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
 ---
@@ -37,38 +39,48 @@
 import org.apache.storm.tuple.Tuple;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
+import org.mockito.internal.util.MockUtil;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class KafkaBoltTest {
 private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBoltTest.class);
-
+
 @SuppressWarnings({ "unchecked", "serial" })
-@Test
-public void testSimple() {
-final KafkaProducer producer = 
mock(KafkaProducer.class);
-when(producer.send(any(), any())).thenAnswer(new Answer() {
+private  KafkaBolt makeBolt(KafkaProducer 
mockProducer) {
+Validate.isTrue(MockUtil.isMock(mockProducer), "KafkaProducer 
should be a mock object.");
+when(mockProducer.send(any(), any())).thenAnswer(new 
Answer() {
 @Override
 public Object answer(InvocationOnMock invocation) throws 
Throwable {
 Callback c = (Callback)invocation.getArguments()[1];
 c.onCompletion(null, null);
 return null;
 }
 });
-KafkaBolt bolt = new KafkaBolt() {
+KafkaBolt bolt = new KafkaBolt() {
 @Override
-protected KafkaProducer mkProducer(Properties 
props) {
-return producer;
+protected KafkaProducer mkProducer(Properties props) {
+return mockProducer;
 }
 };
 bolt.withTopicSelector("MY_TOPIC");
-
+
+return bolt;
+}
+
+@SuppressWarnings({ "unchecked", "serial" })
+@Test
+public void testSimple() {
+final KafkaProducer producer = 
mock(KafkaProducer.class);
--- End diff --

I'm wondering if we would be better off using 
https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/MockProducer.html?
 Our own stubbing ends up doing some weird things, e.g. returning null from 
`send`.


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-03 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207512766
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -62,6 +62,7 @@
 private OutputCollector collector;
 private TupleToKafkaMapper mapper;
 private KafkaTopicSelector topicSelector;
+private Callback providedCallback;
--- End diff --

Also I'm not sure exactly what someone implementing this interface would 
need, but maybe we should add a prepare method to the interface as well, so 
people who need some configuration or the topology context can get access? What 
do you think?


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-03 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207514281
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
 ---
@@ -37,38 +39,48 @@
 import org.apache.storm.tuple.Tuple;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
+import org.mockito.internal.util.MockUtil;
--- End diff --

I would prefer not to use internal classes. I think 
https://static.javadoc.io/org.mockito/mockito-core/2.20.0/org/mockito/Mockito.html#mocking_details
 can do the same thing.


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-03 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r207511912
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -129,6 +140,27 @@ public void prepare(Map topoConf, 
TopologyContext context, Outpu
 return new KafkaProducer<>(props);
 }
 
+/**
+ * Make the producer Callback. Using this Callback will also execute 
the user defined Callback, if provided.
+ */
+protected Callback mkProducerCallback(final Tuple input) {
--- End diff --

I'm not sure we want to make this protected. The default makes use of some 
private fields (e.g. collector), so subclasses won't be able to implement this 
properly, and I'm also not really understanding why someone would need to 
override this, since the code that is already here is pretty important to the 
bolt working correctly.


---


[GitHub] storm pull request #2791: STORM-3176: KafkaSpout commit offset occurs Commit...

2018-08-03 Thread wangzzu
Github user wangzzu closed the pull request at:

https://github.com/apache/storm/pull/2791


---


[GitHub] storm issue #2791: STORM-3176: KafkaSpout commit offset occurs CommitFailedE...

2018-08-03 Thread wangzzu
Github user wangzzu commented on the issue:

https://github.com/apache/storm/pull/2791
  
@srdo sorry, the version I used is 1.1.2. This problem has already fixed in 
1.2.0.


---


[GitHub] storm issue #2791: STORM-3176: KafkaSpout commit offset occurs CommitFailedE...

2018-08-03 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/2791
  
@wangzzu Hi, I'm not sure this should be happening. We're not using the 
`KafkaConsumer.subscribe` API anymore, so Kafka shouldn't be managing the 
partition assignment.

Can you confirm that you're using storm-kafka-client 1.2.0? Could you 
please post your KafkaSpoutConfig? 


---


[GitHub] storm issue #2784: STORM-3166: Make Utils.threadDump account for threads dyi...

2018-08-03 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/2784
  
Yes, exactly.


---


[GitHub] storm pull request #2791: STORM-3176: KafkaSpout commit offset occurs Commit...

2018-08-03 Thread wangzzu
GitHub user wangzzu opened a pull request:

https://github.com/apache/storm/pull/2791

STORM-3176: KafkaSpout commit offset occurs CommitFailedException which 
leads to worker dead

KafkaSpout use the commitAsync api of Consumer, if the interval time 
between call consumer.poll() more than max.poll.interval.ms or the heartbeat of 
consumer timeout, that will occur CommitFailedException,  and then the worker 
will dead, the log like this:

```
2018-07-31 19:19:03.341 o.a.s.util [ERROR] Async loop died!
org.apache.mtkafka.clients.consumer.CommitFailedException: Commit cannot be 
completed since the group has already rebalanced and assigned the partitions to 
another member. This means that the time between subsequent calls to poll() was 
longer th
an the configured max.poll.interval.ms, which typically implies that the 
poll loop is spending too much time message processing. You can address this 
either by increasing the session timeout or by reducing the maximum size of 
batches returned in
poll() with max.poll.records.
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698)
 ~[stormjar.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577)
 ~[stormjar.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1126)
 ~[stormjar.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:XXX)
 ~[stormjar.jar:?]
at 
org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:430)
 ~[stormjar.jar:?]
at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:264) 
~[stormjar.jar:?]
at 
org.apache.storm.daemon.executor$fn__10936$fn__10951$fn__10982.invoke(executor.clj:647)
 ~[storm-core-1.1.2-mt001.jar:?]
at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) 
[storm-core-1.1.2-mt001.jar:?]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112]
2018-07-31 19:19:03.342 o.a.s.d.executor [ERROR]
```

I find it will catch the Exception in auto-commit mode of consumer, the 
source code is:

```java
private void maybeAutoCommitOffsetsSync(long timeoutMs) {
if (autoCommitEnabled) {
Map allConsumedOffsets = 
subscriptions.allConsumed();
try {
log.debug("Sending synchronous auto-commit of offsets {} for 
group {}", allConsumedOffsets, groupId);
if (!commitOffsetsSync(allConsumedOffsets, timeoutMs))
log.debug("Auto-commit of offsets {} for group {} timed out 
before completion",
allConsumedOffsets, groupId);
} catch (WakeupException | InterruptException e) {
log.debug("Auto-commit of offsets {} for group {} was 
interrupted before completion",
allConsumedOffsets, groupId);
// rethrow wakeups since they are triggered by the user
throw e;
} catch (Exception e) {
// consistent with async auto-commit failures, we do not 
propagate the exception
log.warn("Auto-commit of offsets {} failed for group {}: {}", 
allConsumedOffsets, groupId,
e.getMessage());
}
}
}
```

I think KafkaSpout should do like this, catch the Exception avoid to worker 
die. And when the msg ack fail, Spout should judge the offset of the msgID is 
larger than the last commit offset(Spout can guarantee that these msgs which 
offset less than the last commit offset are all ack), if not, the msg should 
not retry.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wangzzu/storm storm-kafka-client

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2791.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2791


commit 54005a2bd28be7928cee05d6500136d3f1fb926d
Author: wangmeng36 
Date:   2018-08-03T08:44:40Z

storm-kafka-client fix the CommitFailedException bug




---


[GitHub] storm issue #2783: [WIP] Make StormMetricsRegistry a regular class rather th...

2018-08-03 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2783
  
I love all the approaches to switch static to non-static if it doesn't 
require any hack or long parameters to inject, and this looks great. +1 to move 
on.


---


[GitHub] storm issue #2773: Blobstore sync bug fix

2018-08-03 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2773
  
@jiangzhileaf Any updates?


---