[GitHub] storm issue #2783: [WIP] Make StormMetricsRegistry a regular class rather th...
Github user srdo commented on the issue: https://github.com/apache/storm/pull/2783 I think it depends a lot on what kind of static method we're talking about. If the method is internal we can just change it and inject the metric, or even better make the method not static and inject the metrics or other dependencies via the constructor. This is sort of what happened with the memory tracking in Container. I see your point for publically accessible static methods though. I'm not sure we can do anything to work around it there. ---
[GitHub] storm issue #2783: [WIP] Make StormMetricsRegistry a regular class rather th...
Github user zd-project commented on the issue: https://github.com/apache/storm/pull/2783 Additionally, it still seems to be not ideal that if we want to measure things inside a static method, we'd have to pass the metric in as a parameter. ---
[GitHub] storm issue #2783: [WIP] Make StormMetricsRegistry a regular class rather th...
Github user srdo commented on the issue: https://github.com/apache/storm/pull/2783 @zd-project You are right that if someone wants to add metrics to a component, they will have to figure out how to get the registry injected. I can't say up front how hard that will be, except to note that it wasn't too bad to make these changes. For the specific case of the Container, I put the metrics in the ContainerMemoryTracker next to the fields they're gauges for. This is primarily because Container gets created a lot of times, and I didn't want to call the `gauge` method every time the constructor is invoked. In this case I chose to move the gauges to a different class with a different lifecycle from Container, but since the `gauge` method uses `getOrAdd` there probably wouldn't be anything wrong with registering the metrics every time a Container is created instead. If I were adding new metrics to Container, I would either make a metrics container object like SlotMetrics that is common to all Containers and pass that in, or pass in the registry and just invoke the registration every time the constructor is invoked. Unless there's a big performance penalty to using `getOrAdd`, I would think that there wouldn't be much difference. ---
[GitHub] storm pull request #2771: STORM-3157: General improvement to StormMetricsReg...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2771#discussion_r207672905 --- Diff: storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java --- @@ -12,28 +12,30 @@ package org.apache.storm.metric; +import com.codahale.metrics.ExponentiallyDecayingReservoir; import com.codahale.metrics.Gauge; import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; import com.codahale.metrics.Metric; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricSet; import com.codahale.metrics.Reservoir; + import java.util.Map; import java.util.concurrent.Callable; + +import com.codahale.metrics.Timer; +import org.apache.commons.lang.StringUtils; import org.apache.storm.daemon.metrics.MetricsUtils; import org.apache.storm.daemon.metrics.reporters.PreparableReporter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@SuppressWarnings("unchecked") -public class StormMetricsRegistry { -private static final MetricRegistry DEFAULT_REGISTRY = new MetricRegistry(); +public class StormMetricsRegistry extends MetricRegistry { --- End diff -- Yes, I agree that for the non-MetricSet metrics, we can just use the getOrAdd wrappers. If we don't need MetricSet with a non-static registry, we should be good if we merge the changes in https://github.com/apache/storm/pull/2783. I agree that we should upgrade, but versions past 4.x have removed the metrics-ganglia module. I'm not sure if it's been spun off somewhere, or if it's just been deleted, but I didn't want to start removing stuff related to Ganglia in https://github.com/apache/storm/pull/2783 as well. If we want to upgrade past 3.1 I think we should do it in a separate PR. ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207671937 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java --- @@ -62,6 +62,7 @@ private OutputCollector collector; private TupleToKafkaMapper mapper; private KafkaTopicSelector topicSelector; +private Callback providedCallback; --- End diff -- That looks good to me. I'm also not sure about the collector, but I'd maybe lean toward not including it. The callback won't have access to incoming tuples, so the only use would be if someone wanted the callback to emit a new unanchored tuple. I wouldn't worry about people misusing the prepare method. They can just as easily override the bolt's prepare method. ---
[GitHub] storm issue #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user zd-project commented on the issue: https://github.com/apache/storm/pull/2754 The metrics of IO exceptions on logviewer is kind of fuzzy as I go through the call stack manually and eyeballed all thrown exceptions. I'm not sure if it's complete or if it will over-count. It'd be great if you can help review it especially. ---
[GitHub] storm pull request #2771: STORM-3157: General improvement to StormMetricsReg...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2771#discussion_r207665264 --- Diff: storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java --- @@ -50,19 +61,19 @@ public static void startMetricsReporters(Map topoConf) { } } -private static T register(final String name, T metric) { -T ret; +@Override +//This is more similar to super#getOrAdd than super#register +public T register(final String name, T metric) throws IllegalArgumentException { --- End diff -- It is fine. But I would like to see more javadoc for this method. ---
[GitHub] storm pull request #2771: STORM-3157: General improvement to StormMetricsReg...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2771#discussion_r207663344 --- Diff: storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java --- @@ -50,19 +61,19 @@ public static void startMetricsReporters(Map topoConf) { } } -private static T register(final String name, T metric) { -T ret; +@Override +//This is more similar to super#getOrAdd than super#register +public T register(final String name, T metric) throws IllegalArgumentException { --- End diff -- I am okay with it this way for now. ---
[GitHub] storm pull request #2752: Storm 1311 Migration of UI from clj to Java
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2752 ---
[GitHub] storm issue #2752: Storm 1311 Migration of UI from clj to Java
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2752 Please squash the commits. ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user dfdemar commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207649108 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java --- @@ -62,6 +62,7 @@ private OutputCollector collector; private TupleToKafkaMapper mapper; private KafkaTopicSelector topicSelector; +private Callback providedCallback; --- End diff -- What about something like this? Not sure if `OutputCollector` is necessary or not. ```java public interface PreparedCallback extends Callback, Serializable { void prepare(Map topoConf, TopologyContext context, OutputCollector outputCollector); } ``` So if the `PreparedCallback` is not null, then KafkaBolt would call `.prepare()` during `KafkaBolt.prepare()`. The one concern I have is that it might be open for abuse by tempting people to use it to inject logic into the bolt preparation that's unrelated to Kafka publishing. ---
[GitHub] storm issue #2710: STORM-3099: Extend metrics on supervisor, workers and DRP...
Github user zd-project commented on the issue: https://github.com/apache/storm/pull/2710 Should I squash to one per daemon? ---
[GitHub] storm pull request #2710: STORM-3099: Extend metrics on supervisor, workers ...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2710#discussion_r207645153 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java --- @@ -28,11 +29,14 @@ import org.apache.storm.shade.org.apache.commons.lang.StringUtils; import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.ObjectReader; +import org.apache.storm.utils.ShellUtils; import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ClientSupervisorUtils { +public static final Meter numWorkerLaunchExceptions = ShellUtils.numShellExceptions; --- End diff -- Logged in Jira ---
[GitHub] storm issue #2783: [WIP] Make StormMetricsRegistry a regular class rather th...
Github user zd-project commented on the issue: https://github.com/apache/storm/pull/2783 Extensibility and Centralized management would be my concerns. I think this PR has improved on the centralized management of metrics. For example I think the slotMetrics class is a great example of this (it's actually a very similar idea to the MetricSet I've put in). It's easier to tell this way what we have been tracking for a certain daemon and how we're tracking it. But I'm not quite sure about the extensibility part, i.e., what I should do if I want to have metrics in a new component or one that hasn't been injected a registry yet. For example, in my PR I added a few metrics in Container, but I don't think I see a way to add them here since Container doesn't have access to StormMetricsRegistry. We might need to do some refactoring before we can actually add in metrics. ---
[GitHub] storm pull request #2771: STORM-3157: General improvement to StormMetricsReg...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2771#discussion_r207640687 --- Diff: storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java --- @@ -12,28 +12,30 @@ package org.apache.storm.metric; +import com.codahale.metrics.ExponentiallyDecayingReservoir; import com.codahale.metrics.Gauge; import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; import com.codahale.metrics.Metric; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricSet; import com.codahale.metrics.Reservoir; + import java.util.Map; import java.util.concurrent.Callable; + +import com.codahale.metrics.Timer; +import org.apache.commons.lang.StringUtils; import org.apache.storm.daemon.metrics.MetricsUtils; import org.apache.storm.daemon.metrics.reporters.PreparableReporter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@SuppressWarnings("unchecked") -public class StormMetricsRegistry { -private static final MetricRegistry DEFAULT_REGISTRY = new MetricRegistry(); +public class StormMetricsRegistry extends MetricRegistry { --- End diff -- Our customization upon MetricsRegistry is actually very similar to the wrapping methods of `#getOrAdd`, such as `gauge`, `timer`, `meter`, and `histogram`. They do not have a way to eliminate double registration of MetricSet though, although we could avoid this altogether with a non-static registry. In addition, I think we should probably upgrading to a newer version of Dropwizard, since current version (3.1.0) is about to be EOL. Their 4.x has a lot of improvement and provides more features on top of Java 8. ---
[GitHub] storm pull request #2771: STORM-3157: General improvement to StormMetricsReg...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2771#discussion_r207638261 --- Diff: storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java --- @@ -12,28 +12,30 @@ package org.apache.storm.metric; +import com.codahale.metrics.ExponentiallyDecayingReservoir; import com.codahale.metrics.Gauge; import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; import com.codahale.metrics.Metric; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricSet; import com.codahale.metrics.Reservoir; + import java.util.Map; import java.util.concurrent.Callable; + +import com.codahale.metrics.Timer; +import org.apache.commons.lang.StringUtils; import org.apache.storm.daemon.metrics.MetricsUtils; import org.apache.storm.daemon.metrics.reporters.PreparableReporter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@SuppressWarnings("unchecked") -public class StormMetricsRegistry { -private static final MetricRegistry DEFAULT_REGISTRY = new MetricRegistry(); +public class StormMetricsRegistry extends MetricRegistry { --- End diff -- I'll take a look at this ---
[GitHub] storm pull request #2771: STORM-3157: General improvement to StormMetricsReg...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2771#discussion_r207636496 --- Diff: storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java --- @@ -50,19 +61,19 @@ public static void startMetricsReporters(Map topoConf) { } } -private static T register(final String name, T metric) { -T ret; +@Override +//This is more similar to super#getOrAdd than super#register +public T register(final String name, T metric) throws IllegalArgumentException { --- End diff -- I added a test for registerMetricSet and unregisterMetricSet in #2754 and #2764 to show that this method has solved the issue of double registration. See https://github.com/apache/storm/pull/2754/commits/597c6bb2d41a7aa0d25c6aab9201b451f6b1eaf1 I don't know if we should move the change to `registerMetricSet` and `unregisterMetricSet` back to this PR, but the lack of them seems to be confusing people of the purpose of this change. @Ethanlm @srdo @revans2 ---
[GitHub] storm pull request #2771: STORM-3157: General improvement to StormMetricsReg...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2771#discussion_r207601720 --- Diff: storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java --- @@ -12,28 +12,30 @@ package org.apache.storm.metric; +import com.codahale.metrics.ExponentiallyDecayingReservoir; import com.codahale.metrics.Gauge; import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; import com.codahale.metrics.Metric; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricSet; import com.codahale.metrics.Reservoir; + import java.util.Map; import java.util.concurrent.Callable; + +import com.codahale.metrics.Timer; +import org.apache.commons.lang.StringUtils; import org.apache.storm.daemon.metrics.MetricsUtils; import org.apache.storm.daemon.metrics.reporters.PreparableReporter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@SuppressWarnings("unchecked") -public class StormMetricsRegistry { -private static final MetricRegistry DEFAULT_REGISTRY = new MetricRegistry(); +public class StormMetricsRegistry extends MetricRegistry { --- End diff -- I think we can make the registry non-static without it being a huge hassle https://github.com/apache/storm/pull/2783. I'll rebase it once https://github.com/apache/storm/pull/2752 goes in, and hopefully we can use it to resolve https://github.com/apache/storm/pull/2714. Regarding the subclassing, I'm wondering if we should instead go ask the Dropwizard guys whether the inability to use `getOrAdd` for MetricSets is intended, and if not whether they'd be open to adding a method to MetricRegistry to allow it. If we're hitting this issue, other people probably are too, and I'd much rather fix it in the library than try to hack around it here. What do you think @zd-project? ---
[GitHub] storm pull request #2771: STORM-3157: General improvement to StormMetricsReg...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2771#discussion_r207599113 --- Diff: storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java --- @@ -12,28 +12,30 @@ package org.apache.storm.metric; +import com.codahale.metrics.ExponentiallyDecayingReservoir; import com.codahale.metrics.Gauge; import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; import com.codahale.metrics.Metric; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricSet; import com.codahale.metrics.Reservoir; + import java.util.Map; import java.util.concurrent.Callable; + +import com.codahale.metrics.Timer; +import org.apache.commons.lang.StringUtils; import org.apache.storm.daemon.metrics.MetricsUtils; import org.apache.storm.daemon.metrics.reporters.PreparableReporter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@SuppressWarnings("unchecked") -public class StormMetricsRegistry { -private static final MetricRegistry DEFAULT_REGISTRY = new MetricRegistry(); +public class StormMetricsRegistry extends MetricRegistry { --- End diff -- The reason I changed the `static register()` to instance `register()` method is stated here. Also I should point out that because StormMetricsRegistry is a static singleton, it will persist throughout each UnitTest. So if a UnitTest class has multiple tests in it, all metrics will actually be registered multiple times. We either have to change all unit tests to have set up and tear down, or we have to completely revamp the StormMetricsRegistry class to make it non-static. As I don't have that much time to do either of them, I came up with this walkaround. @srdo @revans2 @Ethanlm ---
[GitHub] storm pull request #2788: STORM-3170: Fixed bug to eliminate invalid file de...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2788 ---
[GitHub] storm pull request #2771: STORM-3157: General improvement to StormMetricsReg...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2771#discussion_r207594931 --- Diff: storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java --- @@ -50,19 +61,19 @@ public static void startMetricsReporters(Map topoConf) { } } -private static T register(final String name, T metric) { -T ret; +@Override +//This is more similar to super#getOrAdd than super#register +public T register(final String name, T metric) throws IllegalArgumentException { --- End diff -- if the metric is MetricSet, it will be having the same issue with the old code. ---
[GitHub] storm issue #2788: STORM-3170: Fixed bug to eliminate invalid file deletion
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2788 Still +1 ---
[GitHub] storm pull request #2788: STORM-3170: Fixed bug to eliminate invalid file de...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2788#discussion_r207587787 --- Diff: storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/testsupport/MockRemovableFileBuilder.java --- @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. + * See the NOTICE file distributed with this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + */ + +package org.apache.storm.daemon.logviewer.testsupport; + +import org.mockito.Mockito; + +import java.io.File; + +public class MockRemovableFileBuilder extends MockFileBuilder { +@Override +public File build() { +File mockFile = super.build(); +Mockito.when(mockFile.delete()).thenReturn(true); --- End diff -- Okay I'll just file a minor jira then ---
[GitHub] storm pull request #2788: STORM-3170: Fixed bug to eliminate invalid file de...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2788#discussion_r207587402 --- Diff: storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/testsupport/MockRemovableFileBuilder.java --- @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. + * See the NOTICE file distributed with this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + */ + +package org.apache.storm.daemon.logviewer.testsupport; + +import org.mockito.Mockito; + +import java.io.File; + +public class MockRemovableFileBuilder extends MockFileBuilder { +@Override +public File build() { +File mockFile = super.build(); +Mockito.when(mockFile.delete()).thenReturn(true); --- End diff -- No, it's just me nitpicking. We can always replace it if it becomes an issue to use in tests. ---
[GitHub] storm pull request #2787: STORM-3169: Correctly convert configured minutes t...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2787 ---
[GitHub] storm pull request #2785: STORM-3167: Copy map before returning in FakeMetri...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2785 ---
[GitHub] storm pull request #2784: STORM-3166: Make Utils.threadDump account for thre...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2784 ---
[GitHub] storm pull request #2788: STORM-3170: Fixed bug to eliminate invalid file de...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2788#discussion_r207586437 --- Diff: storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/testsupport/MockRemovableFileBuilder.java --- @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. + * See the NOTICE file distributed with this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + */ + +package org.apache.storm.daemon.logviewer.testsupport; + +import org.mockito.Mockito; + +import java.io.File; + +public class MockRemovableFileBuilder extends MockFileBuilder { +@Override +public File build() { +File mockFile = super.build(); +Mockito.when(mockFile.delete()).thenReturn(true); --- End diff -- Should I still concern about this issue then? @revans2 @srdo ---
[GitHub] storm issue #2787: STORM-3169: Correctly convert configured minutes to milli...
Github user zd-project commented on the issue: https://github.com/apache/storm/pull/2787 Squashed. ---
[GitHub] storm issue #2788: STORM-3170: Fixed bug to eliminate invalid file deletion
Github user srdo commented on the issue: https://github.com/apache/storm/pull/2788 +1, thanks for fixing the issue with nondeleteable files. The solution looks good. Please squash and we can merge. ---
[GitHub] storm issue #2787: STORM-3169: Correctly convert configured minutes to milli...
Github user srdo commented on the issue: https://github.com/apache/storm/pull/2787 +1, thanks for the quick fix. Please squash and we can merge. ---
[GitHub] storm pull request #2786: STORM-3168 prevent AsyncLocalizer cleanup from cra...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2786 ---
[GitHub] storm issue #2710: STORM-3099: Extend metrics on supervisor, workers and DRP...
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2710 Also could we squash the commits some? ---
[GitHub] storm pull request #2710: STORM-3099: Extend metrics on supervisor, workers ...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2710#discussion_r207578433 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java --- @@ -1147,8 +1162,19 @@ public DynamicState withPendingLocalization(Future pendingDownload) { */ public DynamicState withState(final MachineState state) { long newStartTime = Time.currentTimeMillis(); +//TODO: potential lost metrics due to timing accuracy (Timer only tracks one call per millisecond) --- End diff -- Please don't leave TODOs int he code. Either fix it, file a follow on JIRA to fix it, or accept it and just have it be a comment and not a TODO. ---
[GitHub] storm pull request #2710: STORM-3099: Extend metrics on supervisor, workers ...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2710#discussion_r207576736 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java --- @@ -28,11 +29,14 @@ import org.apache.storm.shade.org.apache.commons.lang.StringUtils; import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.ObjectReader; +import org.apache.storm.utils.ShellUtils; import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ClientSupervisorUtils { +public static final Meter numWorkerLaunchExceptions = ShellUtils.numShellExceptions; --- End diff -- This all feels a bit too confusing to me. `ShellUtils.numShellExceptions` is static and pulled in from multiple different locations, but only registered once in the supervisor. I personally would rather see it where `ClientSupervisorUtils` registers the Meter, as it is tied to the supervisor having it do it here is fine. For ShellUtils, I would like to see it not have a static meter, but instead optionally pass a meter in when calling run, or perhaps optionally include it in the constructor. ---
[GitHub] storm issue #2771: STORM-3157: General improvement to StormMetricsRegistry
Github user zd-project commented on the issue: https://github.com/apache/storm/pull/2771 `name()` has been removed from this PR. ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user dfdemar commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207560866 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java --- @@ -129,6 +140,27 @@ public void prepare(Map topoConf, TopologyContext context, Outpu return new KafkaProducer<>(props); } +/** + * Make the producer Callback. Using this Callback will also execute the user defined Callback, if provided. + */ +protected Callback mkProducerCallback(final Tuple input) { --- End diff -- Fair point. I'll change it to `private`. ---
[GitHub] storm pull request #2752: Storm 1311 Migration of UI from clj to Java
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2752#discussion_r207554122 --- Diff: storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java --- @@ -0,0 +1,1941 @@ +/** + * Licensed to the Apache Software Foundation (ASF) + * under one or more contributor license agreements. + * See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions + * and limitations under the License. + */ + +package org.apache.storm.daemon.ui; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import javax.servlet.DispatcherType; +import javax.servlet.Servlet; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.SecurityContext; + +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.DaemonConfig; +import org.apache.storm.generated.Bolt; +import org.apache.storm.generated.BoltAggregateStats; +import org.apache.storm.generated.ClusterSummary; +import org.apache.storm.generated.CommonAggregateStats; +import org.apache.storm.generated.ComponentAggregateStats; +import org.apache.storm.generated.ComponentPageInfo; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.ExecutorSummary; +import org.apache.storm.generated.GetInfoOptions; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.KillOptions; +import org.apache.storm.generated.LogConfig; +import org.apache.storm.generated.LogLevel; +import org.apache.storm.generated.LogLevelAction; +import org.apache.storm.generated.Nimbus; +import org.apache.storm.generated.NimbusSummary; +import org.apache.storm.generated.NodeInfo; +import org.apache.storm.generated.NumErrorsChoice; +import org.apache.storm.generated.OwnerResourceSummary; +import org.apache.storm.generated.ProfileAction; +import org.apache.storm.generated.ProfileRequest; +import org.apache.storm.generated.RebalanceOptions; +import org.apache.storm.generated.SpoutSpec; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.generated.SupervisorPageInfo; +import org.apache.storm.generated.SupervisorSummary; +import org.apache.storm.generated.TopologyHistoryInfo; +import org.apache.storm.generated.TopologyInfo; +import org.apache.storm.generated.TopologyPageInfo; +import org.apache.storm.generated.TopologyStats; +import org.apache.storm.generated.TopologySummary; +import org.apache.storm.generated.WorkerSummary; +import org.apache.storm.logging.filters.AccessLoggingFilter; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.thrift.TException; +import org.apache.storm.utils.ObjectReader; +import org.apache.storm.utils.TopologySpoutLag; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.VersionInfo; +import org.apache.storm.utils.WebAppUtils; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.HttpConnectionFactory; +import org.eclipse.jetty.server.SecureRequestCustomizer; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.SslConnectionFactory; +import org.eclipse.jetty.servlet.FilterHolder; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.servlets.CrossOriginFilter; +import org.eclipse.jetty.util.ssl.SslContextFactory;
[GitHub] storm pull request #2752: Storm 1311 Migration of UI from clj to Java
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2752#discussion_r207551893 --- Diff: storm-webapp/src/main/java/org/apache/storm/daemon/ui/filters/AuthorizedUserFilter.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.ui.filters; + +import java.io.IOException; +import java.net.InetAddress; +import java.security.Principal; +import java.util.Map; +import javax.ws.rs.container.ContainerRequestContext; +import javax.ws.rs.container.ContainerRequestFilter; +import javax.ws.rs.container.ResourceInfo; +import javax.ws.rs.core.Context; +import javax.ws.rs.ext.Provider; + +import org.apache.storm.DaemonConfig; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.ui.resources.NimbusOp; +import org.apache.storm.security.auth.IAuthorizer; +import org.apache.storm.security.auth.ReqContext; +import org.apache.storm.thrift.TException; +import org.apache.storm.utils.NimbusClient; +import org.apache.storm.utils.Utils; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Provider +public class AuthorizedUserFilter implements ContainerRequestFilter { + +public static final Logger LOG = LoggerFactory.getLogger(AuthorizedUserFilter.class); +public static Map conf = Utils.readStormConfig(); +public static IAuthorizer uiImpersonationHandler; +public static IAuthorizer uiAclHandler; + +@Context private ResourceInfo resourceInfo; + +static { +try { +uiImpersonationHandler = StormCommon.mkAuthorizationHandler( +(String) conf.get(DaemonConfig.NIMBUS_IMPERSONATION_AUTHORIZER), conf); +uiAclHandler = StormCommon.mkAuthorizationHandler( +(String) conf.get(DaemonConfig.NIMBUS_AUTHORIZER), conf); +} catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) { +LOG.error("Error initializing AuthorizedUserFilter: ", e); +throw new RuntimeException(e); +} +} + +@Override +public void filter(ContainerRequestContext containerRequestContext) throws IOException { +NimbusOp annotation = resourceInfo.getResourceMethod().getAnnotation(NimbusOp.class); +if (annotation == null) { +return; +} +String op = annotation.value(); +if (op == null) { +return; +} + +Map topoConf = null; +if (containerRequestContext.getUriInfo().getPathParameters().containsKey("id")) { +NimbusClient nimbusClient = NimbusClient.getConfiguredClient(Utils.readStormConfig()); +try { +topoConf = (Map) JSONValue.parse(nimbusClient.getClient().getTopologyConf( + containerRequestContext.getUriInfo().getPathParameters().get("id").get(0))); +} catch (TException e) { +e.printStackTrace(); --- End diff -- This appears to be the part that is causing the issues. Printing the stacktrace is not logging it, so it will not show up where we want it. And if we are handling it properly it will not matter. An IOException is not going to be mapper properly into the result we want. Also all errors we don't expect 500 errors will be returned as HTML. Lets have a generic mapper that can take any Throwable and turn it into a JSON response? Also at the same time can we change this IOException so it has e as the cause, and in the Generic exception mapper can we have it look for a cause of Authroized, etc and map it to the proper response? ---
[GitHub] storm issue #2752: Storm 1311 Migration of UI from clj to Java
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2752 I may have found out more. In the logs I see ``` Caused by: java.io.IOException: Unable to fetch topo conf for topo id wc-1-1533264135 at org.apache.storm.daemon.ui.filters.AuthorizedUserFilter.filter(AuthorizedUserFilter.java:85) ~[storm-webapp-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.glassfish.jersey.server.ContainerFilteringStage.apply(ContainerFilteringStage.java:132) ~[jersey-server-2.27.jar:?] ``` So it looks like the authorizer is throwing an IOException which is causing the 500 error ---
[GitHub] storm issue #2752: Storm 1311 Migration of UI from clj to Java
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2752 I am seeing these when I run DRPC and the logviewer. They appear to still work so this is minor, but it would be nice to fix them in a follow on JIRA if you cannot do it quickly now. ``` WARNING: A provider org.apache.storm.daemon.logviewer.webapp.LogviewerResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.storm.daemon.logviewer.webapp.LogviewerResource will be ignored. ``` ``` WARNING: A provider org.apache.storm.daemon.drpc.webapp.DRPCResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.storm.daemon.drpc.webapp.DRPCResource will be ignored. ``` Also I found that the exception mapping does not appear to be working. If you go to the topology page for a topology that is not up (i.e. http://localhost:8080/topology.html?id=wc-1-1533264135 ) I see a stack trace for the NotAliveException being printed out to stdout or stderr (not sure which) ``` NotAliveException(msg:wc-1-1533264135) at org.apache.storm.generated.Nimbus$getTopologyConf_result$getTopologyConf_resultStandardScheme.read(Nimbus.java:45081) at org.apache.storm.generated.Nimbus$getTopologyConf_result$getTopologyConf_resultStandardScheme.read(Nimbus.java:45059) at org.apache.storm.generated.Nimbus$getTopologyConf_result.read(Nimbus.java:44993) ... ``` But the request is returning an HTML response ``` Error 500 Request failed. HTTP ERROR 500 Problem accessing /api/v1/topology/wc-1-1533264135. Reason: Request failed.http://eclipse.org/jetty;>Powered by Jetty:// 9.4.7.v20170914 ``` ---
[GitHub] storm pull request #2752: Storm 1311 Migration of UI from clj to Java
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2752#discussion_r207542398 --- Diff: storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/webapp/LogviewerResource.java --- @@ -137,7 +137,7 @@ public Response searchLogs(@Context HttpServletRequest request) throws IOExcepti String user = httpCredsHandler.getUserName(request); String topologyId = request.getParameter("topoId"); String portStr = request.getParameter("port"); -String callback = request.getParameter("callback"); +String callback = request.getParameter("callbackParameterName"); --- End diff -- So why are we changing `/searchLogs?callback=bar` to `/searchLogs?callbackParameterName=bar`? If that is what we are doing it is a regression, if not I am really missing something. ---
[GitHub] storm pull request #2788: STORM-3170: Fixed bug to eliminate invalid file de...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2788#discussion_r207539609 --- Diff: storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/testsupport/MockRemovableFileBuilder.java --- @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. + * See the NOTICE file distributed with this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + */ + +package org.apache.storm.daemon.logviewer.testsupport; + +import org.mockito.Mockito; + +import java.io.File; + +public class MockRemovableFileBuilder extends MockFileBuilder { +@Override +public File build() { +File mockFile = super.build(); +Mockito.when(mockFile.delete()).thenReturn(true); --- End diff -- @zd-project you can control it more fully, but it gets a little more complicated. Not a big deal but if it is causing problems it can be done. ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user dfdemar commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207537361 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java --- @@ -62,6 +62,7 @@ private OutputCollector collector; private TupleToKafkaMapper mapper; private KafkaTopicSelector topicSelector; +private Callback providedCallback; --- End diff -- Ah, good point. The workaround I was using prior to this was all done after serialization so this wasn't something I had to worry about. Let me think about what that interface should look like and I'll come up with something. ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user dfdemar commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207534991 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java --- @@ -129,6 +140,27 @@ public void prepare(Map topoConf, TopologyContext context, Outpu return new KafkaProducer<>(props); } +/** + * Make the producer Callback. Using this Callback will also execute the user defined Callback, if provided. + */ +protected Callback mkProducerCallback(final Tuple input) { --- End diff -- I can change to `private`. ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207526335 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java --- @@ -37,38 +39,48 @@ import org.apache.storm.tuple.Tuple; import org.junit.Test; import org.mockito.ArgumentMatcher; +import org.mockito.internal.util.MockUtil; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KafkaBoltTest { private static final Logger LOG = LoggerFactory.getLogger(KafkaBoltTest.class); - + @SuppressWarnings({ "unchecked", "serial" }) -@Test -public void testSimple() { -final KafkaProducer producer = mock(KafkaProducer.class); -when(producer.send(any(), any())).thenAnswer(new Answer() { +private KafkaBolt makeBolt(KafkaProducer mockProducer) { +Validate.isTrue(MockUtil.isMock(mockProducer), "KafkaProducer should be a mock object."); +when(mockProducer.send(any(), any())).thenAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocation) throws Throwable { Callback c = (Callback)invocation.getArguments()[1]; c.onCompletion(null, null); return null; } }); -KafkaBolt bolt = new KafkaBolt() { +KafkaBolt bolt = new KafkaBolt() { @Override -protected KafkaProducer mkProducer(Properties props) { -return producer; +protected KafkaProducer mkProducer(Properties props) { +return mockProducer; } }; bolt.withTopicSelector("MY_TOPIC"); - + +return bolt; +} + +@SuppressWarnings({ "unchecked", "serial" }) +@Test +public void testSimple() { +final KafkaProducer producer = mock(KafkaProducer.class); --- End diff -- That would be fine. ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user dfdemar commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207524658 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java --- @@ -37,38 +39,48 @@ import org.apache.storm.tuple.Tuple; import org.junit.Test; import org.mockito.ArgumentMatcher; +import org.mockito.internal.util.MockUtil; --- End diff -- Nice, didn't know about that one. ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user dfdemar commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207524275 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java --- @@ -37,38 +39,48 @@ import org.apache.storm.tuple.Tuple; import org.junit.Test; import org.mockito.ArgumentMatcher; +import org.mockito.internal.util.MockUtil; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KafkaBoltTest { private static final Logger LOG = LoggerFactory.getLogger(KafkaBoltTest.class); - + @SuppressWarnings({ "unchecked", "serial" }) -@Test -public void testSimple() { -final KafkaProducer producer = mock(KafkaProducer.class); -when(producer.send(any(), any())).thenAnswer(new Answer() { +private KafkaBolt makeBolt(KafkaProducer mockProducer) { +Validate.isTrue(MockUtil.isMock(mockProducer), "KafkaProducer should be a mock object."); +when(mockProducer.send(any(), any())).thenAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocation) throws Throwable { Callback c = (Callback)invocation.getArguments()[1]; c.onCompletion(null, null); return null; } }); -KafkaBolt bolt = new KafkaBolt() { +KafkaBolt bolt = new KafkaBolt() { @Override -protected KafkaProducer mkProducer(Properties props) { -return producer; +protected KafkaProducer mkProducer(Properties props) { +return mockProducer; } }; bolt.withTopicSelector("MY_TOPIC"); - + +return bolt; +} + +@SuppressWarnings({ "unchecked", "serial" }) +@Test +public void testSimple() { +final KafkaProducer producer = mock(KafkaProducer.class); --- End diff -- That's a possibility, though `KafkaBolt` has `KafkaProducer` as a private field so `MockProducer` can't be used. That could be fixed by changing that field to a `Producer` interface instead. ---
[GitHub] storm issue #2791: STORM-3176: KafkaSpout commit offset occurs CommitFailedE...
Github user wangzzu commented on the issue: https://github.com/apache/storm/pull/2791 @srdo I found that we are using the deprecated Subscription subtypes actually, thx. ---
[GitHub] storm issue #2791: STORM-3176: KafkaSpout commit offset occurs CommitFailedE...
Github user srdo commented on the issue: https://github.com/apache/storm/pull/2791 @wangzzu Yes, but it should also have been fixed in 1.1.2. Please make sure you're not using one of the deprecated Subscription subtypes, e.g. https://github.com/apache/storm/blob/v1.1.2/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207511238 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java --- @@ -62,6 +62,7 @@ private OutputCollector collector; private TupleToKafkaMapper mapper; private KafkaTopicSelector topicSelector; +private Callback providedCallback; --- End diff -- I think we need to define our own interface for this. Callback isn't serializable, so Storm won't be able to transfer it from the Nimbus submitter to the worker JVM. ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207515186 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java --- @@ -37,38 +39,48 @@ import org.apache.storm.tuple.Tuple; import org.junit.Test; import org.mockito.ArgumentMatcher; +import org.mockito.internal.util.MockUtil; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KafkaBoltTest { private static final Logger LOG = LoggerFactory.getLogger(KafkaBoltTest.class); - + @SuppressWarnings({ "unchecked", "serial" }) -@Test -public void testSimple() { -final KafkaProducer producer = mock(KafkaProducer.class); -when(producer.send(any(), any())).thenAnswer(new Answer() { +private KafkaBolt makeBolt(KafkaProducer mockProducer) { +Validate.isTrue(MockUtil.isMock(mockProducer), "KafkaProducer should be a mock object."); +when(mockProducer.send(any(), any())).thenAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocation) throws Throwable { Callback c = (Callback)invocation.getArguments()[1]; c.onCompletion(null, null); return null; } }); -KafkaBolt bolt = new KafkaBolt() { +KafkaBolt bolt = new KafkaBolt() { @Override -protected KafkaProducer mkProducer(Properties props) { -return producer; +protected KafkaProducer mkProducer(Properties props) { +return mockProducer; } }; bolt.withTopicSelector("MY_TOPIC"); - + +return bolt; +} + +@SuppressWarnings({ "unchecked", "serial" }) +@Test +public void testSimple() { +final KafkaProducer producer = mock(KafkaProducer.class); --- End diff -- I'm wondering if we would be better off using https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/MockProducer.html? Our own stubbing ends up doing some weird things, e.g. returning null from `send`. ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207512766 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java --- @@ -62,6 +62,7 @@ private OutputCollector collector; private TupleToKafkaMapper mapper; private KafkaTopicSelector topicSelector; +private Callback providedCallback; --- End diff -- Also I'm not sure exactly what someone implementing this interface would need, but maybe we should add a prepare method to the interface as well, so people who need some configuration or the topology context can get access? What do you think? ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207514281 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java --- @@ -37,38 +39,48 @@ import org.apache.storm.tuple.Tuple; import org.junit.Test; import org.mockito.ArgumentMatcher; +import org.mockito.internal.util.MockUtil; --- End diff -- I would prefer not to use internal classes. I think https://static.javadoc.io/org.mockito/mockito-core/2.20.0/org/mockito/Mockito.html#mocking_details can do the same thing. ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r207511912 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java --- @@ -129,6 +140,27 @@ public void prepare(Map topoConf, TopologyContext context, Outpu return new KafkaProducer<>(props); } +/** + * Make the producer Callback. Using this Callback will also execute the user defined Callback, if provided. + */ +protected Callback mkProducerCallback(final Tuple input) { --- End diff -- I'm not sure we want to make this protected. The default makes use of some private fields (e.g. collector), so subclasses won't be able to implement this properly, and I'm also not really understanding why someone would need to override this, since the code that is already here is pretty important to the bolt working correctly. ---
[GitHub] storm pull request #2791: STORM-3176: KafkaSpout commit offset occurs Commit...
Github user wangzzu closed the pull request at: https://github.com/apache/storm/pull/2791 ---
[GitHub] storm issue #2791: STORM-3176: KafkaSpout commit offset occurs CommitFailedE...
Github user wangzzu commented on the issue: https://github.com/apache/storm/pull/2791 @srdo sorry, the version I used is 1.1.2. This problem has already fixed in 1.2.0. ---
[GitHub] storm issue #2791: STORM-3176: KafkaSpout commit offset occurs CommitFailedE...
Github user srdo commented on the issue: https://github.com/apache/storm/pull/2791 @wangzzu Hi, I'm not sure this should be happening. We're not using the `KafkaConsumer.subscribe` API anymore, so Kafka shouldn't be managing the partition assignment. Can you confirm that you're using storm-kafka-client 1.2.0? Could you please post your KafkaSpoutConfig? ---
[GitHub] storm issue #2784: STORM-3166: Make Utils.threadDump account for threads dyi...
Github user srdo commented on the issue: https://github.com/apache/storm/pull/2784 Yes, exactly. ---
[GitHub] storm pull request #2791: STORM-3176: KafkaSpout commit offset occurs Commit...
GitHub user wangzzu opened a pull request: https://github.com/apache/storm/pull/2791 STORM-3176: KafkaSpout commit offset occurs CommitFailedException which leads to worker dead KafkaSpout use the commitAsync api of Consumer, if the interval time between call consumer.poll() more than max.poll.interval.ms or the heartbeat of consumer timeout, that will occur CommitFailedException, and then the worker will dead, the log like this: ``` 2018-07-31 19:19:03.341 o.a.s.util [ERROR] Async loop died! org.apache.mtkafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer th an the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698) ~[stormjar.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577) ~[stormjar.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1126) ~[stormjar.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:XXX) ~[stormjar.jar:?] at org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:430) ~[stormjar.jar:?] at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:264) ~[stormjar.jar:?] at org.apache.storm.daemon.executor$fn__10936$fn__10951$fn__10982.invoke(executor.clj:647) ~[storm-core-1.1.2-mt001.jar:?] at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) [storm-core-1.1.2-mt001.jar:?] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112] 2018-07-31 19:19:03.342 o.a.s.d.executor [ERROR] ``` I find it will catch the Exception in auto-commit mode of consumer, the source code is: ```java private void maybeAutoCommitOffsetsSync(long timeoutMs) { if (autoCommitEnabled) { Map allConsumedOffsets = subscriptions.allConsumed(); try { log.debug("Sending synchronous auto-commit of offsets {} for group {}", allConsumedOffsets, groupId); if (!commitOffsetsSync(allConsumedOffsets, timeoutMs)) log.debug("Auto-commit of offsets {} for group {} timed out before completion", allConsumedOffsets, groupId); } catch (WakeupException | InterruptException e) { log.debug("Auto-commit of offsets {} for group {} was interrupted before completion", allConsumedOffsets, groupId); // rethrow wakeups since they are triggered by the user throw e; } catch (Exception e) { // consistent with async auto-commit failures, we do not propagate the exception log.warn("Auto-commit of offsets {} failed for group {}: {}", allConsumedOffsets, groupId, e.getMessage()); } } } ``` I think KafkaSpout should do like this, catch the Exception avoid to worker die. And when the msg ack fail, Spout should judge the offset of the msgID is larger than the last commit offset(Spout can guarantee that these msgs which offset less than the last commit offset are all ack), if not, the msg should not retry. You can merge this pull request into a Git repository by running: $ git pull https://github.com/wangzzu/storm storm-kafka-client Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2791.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2791 commit 54005a2bd28be7928cee05d6500136d3f1fb926d Author: wangmeng36 Date: 2018-08-03T08:44:40Z storm-kafka-client fix the CommitFailedException bug ---
[GitHub] storm issue #2783: [WIP] Make StormMetricsRegistry a regular class rather th...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2783 I love all the approaches to switch static to non-static if it doesn't require any hack or long parameters to inject, and this looks great. +1 to move on. ---
[GitHub] storm issue #2773: Blobstore sync bug fix
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2773 @jiangzhileaf Any updates? ---