[GitHub] storm pull request #2698: STORM-2882: shade storm-client dependencies
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2698#discussion_r193077896 --- Diff: shaded-deps/pom.xml --- @@ -0,0 +1,272 @@ + + + +http://maven.apache.org/POM/4.0.0"; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> +4.0.0 + + +storm +org.apache.storm +2.0.0-SNAPSHOT +.. + + +org.apache.storm +shaded-deps +jar +Shaded Deps for Storm Client +Shaded version of dependencies used only for internal storm code. + + + +com.google.guava +guava + + +org.apache.curator +curator-framework + + +org.apache.curator +curator-client + + + +org.apache.curator +curator-recipes + + +org.apache.zookeeper +zookeeper + + + +jline +jline + + + +org.apache.yetus +audience-annotations + + + + +commons-io +commons-io +compile --- End diff -- +1 I tried to remove/cleanup the ones I copies and pasted, but I guess I missed one ---
[GitHub] storm pull request #2698: STORM-2882: shade storm-client dependencies
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2698#discussion_r193077799 --- Diff: shaded-deps/pom.xml --- @@ -0,0 +1,272 @@ + + + +http://maven.apache.org/POM/4.0.0"; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> +4.0.0 + + +storm +org.apache.storm +2.0.0-SNAPSHOT +.. + + +org.apache.storm +shaded-deps +jar +Shaded Deps for Storm Client +Shaded version of dependencies used only for internal storm code. + + + +com.google.guava +guava + + +org.apache.curator +curator-framework + + +org.apache.curator +curator-client + +
[GitHub] storm issue #2700: [STORM-3093] Cache the storm id to executors mapping on m...
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2700 The travis failure looks unrelated. ---
[GitHub] storm issue #2691: STORM-3061: Update version of hbase
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2691 I am planning right now to get STORM-2882 in first, and then I will come back and do as much manual testing as possible for the different components, and update thing accordingly. ---
[GitHub] storm issue #2698: STORM-2882: shade storm-client dependencies
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2698 @HeartSaVioR I have addressed the other review comments, but I could not find a place in the documentation for instructions on how to do a release. I agree that we want to publish the shaded-deps too, or clients will not be able to find their dependencies. Just not sure where that is documented, or what scripts there are that do it, if any. ---
[GitHub] storm pull request #2701: STORM-3091 don't allow workers to create heartbeat...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2701#discussion_r192757264 --- Diff: storm-client/src/jvm/org/apache/storm/utils/VersionedStore.java --- @@ -25,9 +25,17 @@ private String _root; -public VersionedStore(String path) throws IOException { +/** + * Creates a store at the given path. + * + * @path The path for the store --- End diff -- Should be `@param path The path for the store` ---
[GitHub] storm issue #2669: [STORM-3055] remove conext connection cache
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2669 Actually we just hit this in production so thanks again @pczb for finding and fixing this. ---
[GitHub] storm issue #2698: STORM-2882: shade storm-client dependencies
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2698 Shading here is different from how it is in 1.x. For this one there is a separate package that creates a shaded uber jar. The code inside storm-client and storm-server that need to use it will call the shaded APIs directly. The upside is that IDEs work grate because they are operating on the shaded APIs and don't have to worry about how shading works. The downside is that doing a full build is now a 2 step process. This is because of a "bug" in maven/shade were shading in a multi-module project results in down stream builds not seeing the shaded dependencies. I cannot really combine them into a single build step because the version of maven with this bug in it is now standard in most places, so our "fix" for 1.x is not really valid any more. ---
[GitHub] storm issue #2698: STORM-2882: shade storm-client dependencies
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2698 Dependencies for the client in 1.2.1 asm clojure - removed in 2.x disruptor - removed by this patch gmetric4j kryo log4j2 log4j1.2-api metrics-core metrics-ganglia metrics-graphite minlog objeneses reflectasm ring-cors - removed in 2.x servlet-api-2.5 - removed by this patch slf4j storm-core - removed in 2.x storm-rename-hack - removed in 2.x The only dependency on the storm client classpath not in 1.2.1 is org.acplt:oncrpc:jar:1.0.7 because it was pulled in by a newer version of io.dropwizard.metrics:metrics-ganglia. ---
[GitHub] storm pull request #2698: STORM-2882: shade storm-client dependencies
GitHub user revans2 opened a pull request: https://github.com/apache/storm/pull/2698 STORM-2882: shade storm-client dependencies You can merge this pull request into a Git repository by running: $ git pull https://github.com/revans2/incubator-storm STORM-2882 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2698.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2698 commit 95082cec14cd03bc566db2b0e90555446b519082 Author: Robert (Bobby) Evans Date: 2018-05-15T15:05:23Z STORM-2882: Shade Guava and Curator commit 3f6a43f04e02bb238312fff242eb7c4e499d3fdc Author: Robert (Bobby) Evans Date: 2018-05-15T23:28:20Z STORM-2882: Moved Servlet API to Server commit d20757b08b21b2b714e02a7f17a793b11052f2cd Author: Robert (Bobby) Evans Date: 2018-05-16T00:19:02Z STORM-2882: Change Auth Methods to match standards commit df0fd118dd0e953df25bb7ad3d656cbcccd84626 Author: Robert (Bobby) Evans Date: 2018-05-16T12:27:25Z Missed 2 files commit 11d5f50f9737f173d764b1cdfd7010274026b6b0 Author: Robert (Bobby) Evans Date: 2018-05-16T12:32:37Z Small config change commit 9ca3adf6bd0cbdf093bc2a1c6116c7e51e89d2ab Author: Robert (Bobby) Evans Date: 2018-05-16T14:51:47Z STORM-2882: Shaded zookeeper commit 89b11f68483d9d1940abd9054fdeac37f2739c5b Author: Robert (Bobby) Evans Date: 2018-05-16T17:05:34Z STORM-2882: jline commit 2f023ef20e9af1d4d3bbe55dc67c2e9bf6e1df29 Author: Robert (Bobby) Evans Date: 2018-05-25T21:56:52Z STORM-2882: Fixed some issues with tests with guava conflicts commit 13315e8787e5597a912e0351c5ecdf447551e098 Author: Robert (Bobby) Evans Date: 2018-05-29T16:34:49Z STORM-2882: commons-{lang,collections,io} commit 2ed99008776494c58a33ada5cbdc1179e6703158 Author: Robert (Bobby) Evans Date: 2018-05-29T18:05:07Z STORM-2882: jctools commit 6fb2ce8e77b05c8faa143e496056295a1a7d16f5 Author: Robert (Bobby) Evans Date: 2018-05-29T18:57:28Z STORM-2882: JAXB commit 4f81520d756fdd2616a9a23f26af9a35bb5f846c Author: Robert (Bobby) Evans Date: 2018-05-29T19:42:12Z STORM-2882: json-simple commit 169d1979b40f97ca35f39743f4c9964ef6ba3e4e Author: Robert (Bobby) Evans Date: 2018-05-29T20:26:52Z STORM-2882: snakeyaml commit 857d4ed05ac4430b9d0fbe561ae8986b74d2cb83 Author: Robert (Bobby) Evans Date: 2018-05-30T12:27:10Z STORM-2882: netty commit a7d784afc521a01665fa494ee1a499038db8cee1 Author: Robert (Bobby) Evans Date: 2018-05-30T13:09:09Z STORM-2882: jgrapht commit 77cc82ade1baa752a0ac1f5872e46d8f20d41aaf Author: Robert (Bobby) Evans Date: 2018-05-30T15:37:55Z STORM-2882: thrift commit 87109c6f70a0843b8940fea7fe93199fa14d1126 Author: Robert (Bobby) Evans Date: 2018-05-30T17:03:57Z STORM-2882: removed dep declaration commit ea60f13f96b3325889a7a6921b925beb41781a16 Author: Robert (Bobby) Evans Date: 2018-05-30T18:23:42Z STORM-2882: commons-codec httpclient commit 084c8589531be585dc8a920f7da4078e3a1723dd Author: Robert (Bobby) Evans Date: 2018-05-30T19:09:15Z STORM-2882: httpcore commit 513dd9214ca746082791b17b576eccd3ab18f7a5 Author: Robert (Bobby) Evans Date: 2018-05-30T20:04:15Z STORM-2882: sysout-over-slf4j ---
[GitHub] storm pull request #2694: STORM-3061: Upgrade version of hdrhistogram
GitHub user revans2 opened a pull request: https://github.com/apache/storm/pull/2694 STORM-3061: Upgrade version of hdrhistogram not too much is using hdrhistogram. storm-metrics, storm-loadgen, storm-starter, storm-elasticsearch, storm-elasticsearch-examples The ES code is not running because of what appear to be issues with the version of guava. when we shade guava it should take care of that, and I will be able to test these changes better. I think I am going to change focus for a while and put in shading so we can fix some of these issues and actually test these things. I manually tested the others and the code works. You can merge this pull request into a Git repository by running: $ git pull https://github.com/revans2/incubator-storm STORM-3061-hdrhisto Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2694.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2694 commit 5e11cb99e94b6157b1aa7b97f0eaa57faa2d5df5 Author: Robert (Bobby) Evans Date: 2018-05-25T20:02:16Z STORM-3061: Upgrade version of hdrhistogram ---
[GitHub] storm issue #2691: STORM-3061: Update version of hbase
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2691 Yes that was the plan. there is a lot that depends on storm-autocreds and I would like to understand it all better before I try to clean it up. ---
[GitHub] storm issue #2691: STORM-3061: Update version of hbase
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2691 @arunmahadevan I am happy to try and split up autocreds to make that happen, but it is a much larger job than what is currently for this. If you are fine with waiting I would rather file a follow on JIRA to upgrade to 2.0.0 and split up autocreds instead of blocking this. ---
[GitHub] storm pull request #2691: STORM-3061: Update version of hbase
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2691#discussion_r190432849 --- Diff: pom.xml --- @@ -294,7 +294,7 @@ 0.14.0 2.6.1 ${hadoop.version} -1.1.12 +1.4.4 --- End diff -- In 2.0 TokenUtil is a part of hbase-server. I will spend some time to see if there is something I can do to work around this but I really don't want to require the server code be shipped with all of the clients. Perhaps I can refactor autocreds so the nimbus portions are split off into a separate package. ---
[GitHub] storm pull request #2691: STORM-3061: Update version of hbase
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2691#discussion_r190428653 --- Diff: pom.xml --- @@ -294,7 +294,7 @@ 0.14.0 2.6.1 ${hadoop.version} -1.1.12 +1.4.4 --- End diff -- I am happy to give it a try. I was a bit cautious with it being a major version change, and that there was no 2.0.1 yet. But it does fit better with going to hadoop 3.1 coming in another pull request. Probably after this one is merged in. ---
[GitHub] storm pull request #2691: STORM-3061: Update version of hbase
GitHub user revans2 opened a pull request: https://github.com/apache/storm/pull/2691 STORM-3061: Update version of hbase This updates the version of hbase used and cleans up some of the dependencies. The biggest change besides updating the version is that we remove storm-server as a dependency, because it was only used for access to a copy of StringUtils. This ends up impacting a lot of packages that were pulling in storm-hbase, either directly or indirectly. storm-autocreds strom-hdfs (because it depends on storm-autocreds) flux-core (not really sure why the core of flux needs hbase but it is a dependency) flux-examples storm-sql-hdfs (because of storm-autocreds) storm-hdfs-blobstore (auotcreds again) storm-hive (autocreds yet again) storm-starter storm-hdfs-examples (autocreds) storm-hbase-examples storm-hive-examples (autocreds) storm-perf (autocreds) I have not run any of the manual tests for this yet. But I plan on doing some soon. You can merge this pull request into a Git repository by running: $ git pull https://github.com/revans2/incubator-storm STORM-3061-hbase-cleanup Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2691.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2691 commit 63506a0a3b66f7dc2100480282d99fda8537d2f4 Author: Robert (Bobby) Evans Date: 2018-05-23T21:53:16Z STORM-3061: Update version of hbase ---
[GitHub] storm pull request #2688: STORM-3061: Remove unused core dependencies
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2688#discussion_r190401760 --- Diff: bin/storm.py --- @@ -705,7 +705,6 @@ def nimbus(klass="org.apache.storm.daemon.nimbus.Nimbus"): cppaths = [CLUSTER_CONF_DIR] jvmopts = parse_args(confvalue("nimbus.childopts", cppaths)) + [ "-Dlogfile.name=nimbus.log", - "-DLog4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector", --- End diff -- So a long time ago we tried out async logging for log4j. It kind of helped, but also caused some issues. It puts the log messages into a disruptor queue and has a background thread handles writing them out. In my big patch after upgrading the version of log4j and removing disruptor from the classpath async logging started to get errors about disruptor not being on the classpath. I don't know which caused it, or if it was a combination of things, but I thought if I was removing disruptor from the classpath this would be the right place to remove the async logging too. ---
[GitHub] storm pull request #2690: STORM-3061: Clean up some storm-druid dependencies
GitHub user revans2 opened a pull request: https://github.com/apache/storm/pull/2690 STORM-3061: Clean up some storm-druid dependencies `mvn dependency:tree` showed that the only real dependency changes were for scala and jackson that went from 2.4.6 to 2.9.4. I am not totally sure how to test this currently as there is an open issue right now that it does not work. If others want to try this feel welcome to, if not we can try to fix any remaining issues as a part of https://issues.apache.org/jira/browse/STORM-2884 You can merge this pull request into a Git repository by running: $ git pull https://github.com/revans2/incubator-storm STORM-3061-druid Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2690.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2690 commit 71f78eccda533be2c84e1df26100e5ada9f13c12 Author: Robert (Bobby) Evans Date: 2018-05-23T20:44:13Z STORM-3061: Clean up some storm-druid dependencies ---
[GitHub] storm issue #2687: STORM-3061: thrift 0.11
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2687 Yes I reran the integration tests manually and they all passed. ---
[GitHub] storm issue #2687: STORM-3061: thrift 0.11
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2687 The integration test failures look unrelated to this change. ---
[GitHub] storm pull request #2689: STORM-3061: rocket, jms, and mqtt updates
GitHub user revans2 opened a pull request: https://github.com/apache/storm/pull/2689 STORM-3061: rocket, jms, and mqtt updates This updates some dependencies for the jms examples, mqtt examples, rocketmq examples, and updates activemq implementation used for testing jms, rocketmq, and activemq. I ran some manual tests of the example topologies that I could find. I couldn't make all of them really work, because there was no clear documentation about most of them on how to setup or run the example topologies. mqtt examples using flink had no documentation at all so I wasn't able to successfully run anything. storm-jms-examples fails with what appears to be a JNI issue, but it fails exactly the same way on 1.2.3-SNAPSHOT too. If someone with more experience with these could improve the documentation about how to run them that would be great. You can merge this pull request into a Git repository by running: $ git pull https://github.com/revans2/incubator-storm STORM-3061-rmq Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2689.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2689 commit 95ea31da49825632c4fb6068cb50d8d315c59775 Author: Robert (Bobby) Evans Date: 2018-05-23T14:41:48Z STORM-3061: Remove unneeded deps from rocketmq-examples commit 433689c95526c3be92b3bec2fd3dbcfe6198fb64 Author: Robert (Bobby) Evans Date: 2018-05-23T17:53:44Z STORM-3061: mqtt-update ---
[GitHub] storm pull request #2687: STORM-3061: thrift 0.11
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2687#discussion_r190287991 --- Diff: storm-client/src/genthrift.sh --- @@ -17,7 +17,7 @@ rm -rf gen-javabean gen-py py rm -rf jvm/org/apache/storm/generated -thrift --gen java:beans,hashcode,nocamel,generated_annotations=undated --gen py:utf8strings storm.thrift --- End diff -- @srdo do you want me to remove the utf8strings option? ---
[GitHub] storm pull request #2688: STORM-3061: Remove unused core dependencies
GitHub user revans2 opened a pull request: https://github.com/apache/storm/pull/2688 STORM-3061: Remove unused core dependencies This is for disruptor and java.jmx You can merge this pull request into a Git repository by running: $ git pull https://github.com/revans2/incubator-storm STORM-3061-remove-core-deps Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2688.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2688 commit 100a4daf060a851a247c954cbe2c03d39ea4d8d8 Author: Robert (Bobby) Evans Date: 2018-05-23T15:12:06Z STORM-3061: Remove unused core dependencies ---
[GitHub] storm pull request #2687: STORM-3061: thrift 0.11
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2687#discussion_r190067973 --- Diff: storm-client/src/genthrift.sh --- @@ -17,7 +17,7 @@ rm -rf gen-javabean gen-py py rm -rf jvm/org/apache/storm/generated -thrift --gen java:beans,hashcode,nocamel,generated_annotations=undated --gen py:utf8strings storm.thrift --- End diff -- Not sure why we had hashcode to begin with. From the 0.9.3 compiler --help it shows ``` csharp (C#): ... hashcode:Generate a hashcode and equals implementation for classes. ``` So it is generating hash codes for the c# language. I believe that it was also being used for python, but that is not documented in thrift. In 0.11.0 it is still there in the help command for c#, but now if I run with out the change I get the error. ``` [FAILURE:generation:1] Error: unknown option java:hashcode ``` ---
[GitHub] storm pull request #2687: STORM-3061: thrift 0.11
GitHub user revans2 opened a pull request: https://github.com/apache/storm/pull/2687 STORM-3061: thrift 0.11 This moves to thrift 0.11 The only files changed are pom.xml and genthrift.sh. The other files are all generated. You can merge this pull request into a Git repository by running: $ git pull https://github.com/revans2/incubator-storm STORM-3061-thrift Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2687.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2687 commit 1fd1e17fb2ede7bbb872bfca9ac6dd50fa58efd5 Author: Robert (Bobby) Evans Date: 2018-05-22T21:26:21Z STORM-3061: thrift 0.11 ---
[GitHub] storm pull request #2675: STORM-3061: Upgrade lots of dependencies
Github user revans2 closed the pull request at: https://github.com/apache/storm/pull/2675 ---
[GitHub] storm issue #2675: STORM-3061: Upgrade lots of dependencies
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2675 I'll try to break it down to more manageable chunks that are split up by component as much as possible. There are a lot of the dependencies that cross boundaries though I will call those out in each individual pull request. @srdo I am going to close this pull request so I can break it up. I will look at/fix what you have called out. ---
[GitHub] storm pull request #2684: STORM-3079 add getMessage() support for thrift exc...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2684#discussion_r189076711 --- Diff: storm-client/src/jvm/org/apache/storm/utils/WrappedAuthorizationException.java --- @@ -0,0 +1,17 @@ +package org.apache.storm.utils; --- End diff -- Missing the license header. ---
[GitHub] storm issue #2669: [STORM-3055] remove conext connection cache
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2669 Once you make the corresponding changes on #2661 I would be happy to merge them both in. ---
[GitHub] storm issue #2675: STORM-3061: Upgrade lots of dependencies
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2675 The failure looks like it might be related to #2674 ---
[GitHub] storm issue #2618: [STORM-2905] Fix KeyNotFoundException when kill a storm a...
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2618 @danny0405 Sorry about the long delay. I also got rather busy with other things. My personal choice would be a combination of 1 and 2. We have run into an issue internally where very rarely where a blob may be uploaded to nimbus as part of submitting a topology and then the blob is deleted before the topology itself can be submitted. We are likely to fix this by using a variant of 1, something where we give ourselves a few mins after we see a blob with no corresponding topology before we decide it is fine to delete it. That should fix the issue in 99% of the cases, and also fix the upload issue. ---
[GitHub] storm issue #2633: STORM-3028 HdfsSpout: handle empty file in case of ackers
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2633 @ghajos any plans to address the question from @srdo ---
[GitHub] storm issue #2651: [STORM-3054] Add Topology level configuration socket time...
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2651 @kishorvpatil any update on the comments from @HeartSaVioR ? ---
[GitHub] storm issue #2661: [STORM-3055] 1.1.x remove conext connection cache
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2661 @pczb I have the same comments that I put on the master branch version of the pull request. ---
[GitHub] storm pull request #2669: [STORM-3055] remove conext connection cache
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2669#discussion_r188055495 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java --- @@ -451,7 +451,6 @@ public int getPort() { public void close() { if (!closing) { LOG.info("closing Netty Client {}", dstAddressPrefixedName); -context.removeClient(dstHost, dstAddress.getPort()); --- End diff -- This line is the only reason to pass context into the Client. Please remove all references to context if we are going to remove this. ---
[GitHub] storm pull request #2669: [STORM-3055] remove conext connection cache
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2669#discussion_r188056330 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java --- @@ -65,20 +65,8 @@ public synchronized IConnection bind(String storm_id, int port) { * establish a connection to a remote server */ public synchronized IConnection connect(String storm_id, String host, int port, AtomicBoolean[] remoteBpStatus) { -IConnection connection = connections.get(key(host, port)); --- End diff -- Please rename connections to `serverConnections` as it is only for Servers now. Also it would be nice to change it back to a list or a set instead of a map, because we will no longer be using it as a cache, so we don't need to pull out a specific value any longer. We just want it so we can close them on termination. ---
[GitHub] storm pull request #2669: [STORM-3055] remove conext connection cache
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2669#discussion_r188055726 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java --- @@ -65,20 +65,8 @@ public synchronized IConnection bind(String storm_id, int port) { * establish a connection to a remote server */ public synchronized IConnection connect(String storm_id, String host, int port, AtomicBoolean[] remoteBpStatus) { --- End diff -- This no longer needs to be synchronized. That was used to protect connections, and is no longer needed if all we do is create a new Client. ---
[GitHub] storm issue #2669: [STORM-3055] remove conext connection cache
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2669 The original code added this so when shutting down the context we could be sure that all ongoing connections were also terminated as cleanly as possible. After this change that is only true for the server side and not the client side. What are the ramifications of that? I think it was just defensive programming so it is probably fine to make this change from that perspective, but I would to make sure that my understanding of the problem matches yours. From the comments in STORM-3055 the error appears to be triggered when a supervisor is shut down, wiped clean (so it gets a new supervisor id), and then brought back up. At that point nimbus schedules the worker on the same node/slot as before, but with a new supervisor ID. This confuses the connection caching because when updating the connections it gets a list of connections to shut down and a separate list of connections to create. The new connections are created, but in this case a new one is not created because we already have it open. Then the old unneeded connections are torn down, but in this case the connection is needed. Looking at the javadocs for IContext. It looks like the caching really does violate the spec, but it is a bit of a gray area. https://github.com/apache/storm/blob/53f38bc31f2fd315a520ba6b86c0a60be08381cc/storm-client/src/jvm/org/apache/storm/messaging/IContext.java#L48-L57 I am fine with removing the caching like this JIRA does, but I do want to see the code cleaned up, because without the caching there is a lot of extra code that can be removed. ---
[GitHub] storm issue #2675: STORM-3061: Upgrade lots of dependencies
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2675 The travis failure is an interesting one, sql-core is failing, but none of the tests are failing, the junit JVM exist badly as a part of shutdown. I have not been able to reproduce it locally, but I'll keep trying. ---
[GitHub] storm pull request #2675: STORM-3061: Upgrade lots of dependencies
GitHub user revans2 opened a pull request: https://github.com/apache/storm/pull/2675 STORM-3061: Upgrade lots of dependencies This upgrades lots of dependencies, including thrift. The thrift changes are based off of an earlier patch done by @arunmahadevan Please skip over the generated files and look mostly at the other files. I have only tested that the unit tests pass and that a single node cluster comes up and runs, with similar performance. I did make a few minor changes in some places to be able to remove dependencies that are not used/needed any more (hbase-server from storm-hbase just to get some string functions). You can merge this pull request into a Git repository by running: $ git pull https://github.com/revans2/incubator-storm STORM-3061 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2675.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2675 commit b2428e9d672dc0c8b94a6f7acc7e87ca239070f1 Author: Robert (Bobby) Evans Date: 2018-05-08T17:30:40Z STORM-3061: Upgrade lots of dependencies ---
[GitHub] storm issue #2647: STORM-3040: Improve scheduler performance
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2647 @danny0405 the failure is a known race condition around netty and is not related to this change. ---
[GitHub] storm issue #2664: STORM-2884: Remove storm-druid
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2664 @arunmahadevan and @srdo https://issues.apache.org/jira/browse/STORM-2882 is for adding back in shading to the storm client. I am happy to take that up next. My real concern is the long term support for tranquility. It looks like there is consensus that dropping the druid bolt/state is not what is wanted, so I will close this pull request. ---
[GitHub] storm pull request #2664: STORM-2884: Remove storm-druid
Github user revans2 closed the pull request at: https://github.com/apache/storm/pull/2664 ---
[GitHub] storm issue #2647: STORM-3040: Improve scheduler performance
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2647 @danny0405 I added in the comments about thread safety like you suggested. ---
[GitHub] storm pull request #2647: STORM-3040: Improve scheduler performance
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2647#discussion_r186756808 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java --- @@ -763,6 +773,7 @@ public void setAssignments( assertValidTopologyForModification(assignment.getTopologyId()); } assignments.clear(); +totalResourcesPerNodeCache.clear(); --- End diff -- I tried that, but it didn't have the performance boost I was hoping for. The vast majority of the performance problem came from recomputing the value each time we wanted to sort, with for GRAS is once per executor. So without the cache for a large topology we were recomputing things hundreds of thousands of times. With the cache it is only how many nodes are in the cluster, which ends up being relatively small. In reality the noise between runs drowned out any improvement, so I opted to not do the change. ---
[GitHub] storm pull request #2647: STORM-3040: Improve scheduler performance
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2647#discussion_r186754134 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java --- @@ -48,6 +49,9 @@ public class Cluster implements ISchedulingState { private static final Logger LOG = LoggerFactory.getLogger(Cluster.class); +private static final Function> MAKE_SET = (x) -> new HashSet<>(); +private static final Function> MAKE_MAP = (x) -> new HashMap<>(); --- End diff -- I am happy to add in a comment to Cluster itself about it, as none of Cluster is currently thread safe. As for parallel scheduling the plan that we had been thinking about was more around scheduling multiple topologies in parallel, rather then trying to make a single scheduler strategy multi-threaded, but both have advantages and disadvantages. ---
[GitHub] storm pull request #2664: STORM-2884: Remove storm-druid
GitHub user revans2 opened a pull request: https://github.com/apache/storm/pull/2664 STORM-2884: Remove storm-druid STORM-2884 is an issue with differences in the version of dependencies in druid tranquility conflicting with versions in storm. I personally think the right way to fix this issue is to drop tranquility. The code has not seen a release since Jun 29, 2016. https://github.com/druid-io/tranquility/releases Honestly the project looks like it is close to dead. Couple that with the fact that druid supports ingesting results from kafka. http://druid.io/blog/2013/08/30/loading-data.html and to me there really is not much reason to try and maintain this plugin. You can merge this pull request into a Git repository by running: $ git pull https://github.com/revans2/incubator-storm STORM-2884 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2664.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2664 commit d63ab11cc5e1f1ce58e419852f3ad520a2020186 Author: Robert (Bobby) Evans Date: 2018-05-07T18:45:42Z STORM-2884: Remove storm-druid ---
[GitHub] storm issue #2647: STORM-3040: Improve scheduler performance
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2647 Oh I forgot I also added back in something I messed up before and added back in anti-affinity to GRAS. ---
[GitHub] storm issue #2647: STORM-3040: Improve scheduler performance
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2647 @danny0405 @kishorvpatil With some recent changes to master my patch started to fail with some checkstyle issues. I have rebased and fixed all of the issues. Please take a look again, specifically the second commit and let me know. ---
[GitHub] storm issue #2657: STORM-3048 : A Potential NPE
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2657 I am getting a failure now of ``` [ERROR] Failed to execute goal org.apache.maven.plugins:maven-checkstyle-plugin:2.17:check (validate) on project storm-server: You have 792 Checkstyle violations. The maximum number of allowed violations is 783. -> [Help 1] ``` Specifically ``` /Users/evans/src/apache-commit/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java 101: WhitespaceAround: 'assert' is not followed by whitespace. Empty blocks may only be represented as {} when not part of a multi-block statement (4.1.3) 101: WhitespaceAround: '!=' is not preceded with whitespace. 101: WhitespaceAround: '!=' is not followed by whitespace. Empty blocks may only be represented as {} when not part of a multi-block statement (4.1.3) 139: WhitespaceAround: 'assert' is not followed by whitespace. Empty blocks may only be represented as {} when not part of a multi-block statement (4.1.3) 139: WhitespaceAround: '!=' is not preceded with whitespace. 139: WhitespaceAround: '!=' is not followed by whitespace. Empty blocks may only be represented as {} when not part of a multi-block statement (4.1.3) 178: WhitespaceAround: 'assert' is not followed by whitespace. Empty blocks may only be represented as {} when not part of a multi-block statement (4.1.3) 178: WhitespaceAround: '!=' is not preceded with whitespace. 178: WhitespaceAround: '!=' is not followed by whitespace. Empty blocks may only be represented as {} when not part of a multi-block statement (4.1.3) ``` ---
[GitHub] storm issue #2647: STORM-3040: Improve scheduler performance
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2647 @danny0405 In my tests `TestResourceAwareScheduler.testLargeTopologiesCommon` went from about 7 mins to about 7 seconds. For `TestResourceAwareScheduler.testLargeTopologiesOnLargeClustersGras` I don't have a before value because I killed it after an hour. The after is about 7 seconds per topology, or about a min and a half. ---
[GitHub] storm issue #2641: [STORM-3037] Lowering CheckStyle Violations across all mo...
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2641 Still +1 ---
[GitHub] storm pull request #2647: STORM-3040: Improve scheduler performance
GitHub user revans2 opened a pull request: https://github.com/apache/storm/pull/2647 STORM-3040: Improve scheduler performance There are a lot of different scheduler improvements. Mostly these are either caching, storing data in multiple ways so we can look it up quickly, and finally lazily sorting nodes in a rack only when it is needed, instead of all ahead of time. I also added in performance tests. They currently pass on travis, but I would like to hear from others on if this solution looks good or if there is a better way for us to do performance testing. You can merge this pull request into a Git repository by running: $ git pull https://github.com/revans2/incubator-storm STORM-3040 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2647.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2647 commit 8d3e5cf0e6f7f90d3007159d30c3456bd9749b1f Author: Robert (Bobby) Evans Date: 2018-04-24T20:19:32Z STORM-3040: Improve scheduler performance ---
[GitHub] storm pull request #2630: STORM-3024: Allow for scheduling to happen in the ...
Github user revans2 closed the pull request at: https://github.com/apache/storm/pull/2630 ---
[GitHub] storm pull request #2634: [STORM-3021] Fix wrong usages of Config.TOPOLOGY_W...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2634#discussion_r182750384 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -3032,9 +3019,19 @@ public void submitTopologyWithOpts(String topoName, String uploadedJarLocation, // if the other config does not have it set. topology = normalizeTopology(totalConf, topology); -//set the number of acker executors; -totalConfToSave.put(Config.TOPOLOGY_ACKER_EXECUTORS, getNumOfAckerExecs(totalConf, topology)); -LOG.debug("Config.TOPOLOGY_ACKER_EXECUTORS set to: {}", totalConfToSave.get(Config.TOPOLOGY_ACKER_EXECUTORS)); +// if the Resource Aware Scheduler is used, +// we might need to set the number of acker executors and eventlogger executors to be the estimated number of workers. +if (ServerUtils.isRAS(conf)) { +int estimatedNumWorker = ServerUtils.getEstimatedWorkerCountForRASTopo(totalConf, topology); +int numAckerExecs = ObjectReader.getInt(totalConf.get(Config.TOPOLOGY_ACKER_EXECUTORS), estimatedNumWorker); +int numEventLoggerExecs = ObjectReader.getInt(totalConf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS), estimatedNumWorker); + +totalConfToSave.put(Config.TOPOLOGY_ACKER_EXECUTORS, numAckerExecs); --- End diff -- No we are not overriding the user setting. We are overriding the default value. ---
[GitHub] storm pull request #2634: [STORM-3021] Fix wrong usages of Config.TOPOLOGY_W...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2634#discussion_r182747281 --- Diff: docs/Resource_Aware_Scheduler_overview.md --- @@ -184,6 +184,10 @@ The user can set some default configurations for the Resource Aware Scheduler in topology.worker.max.heap.size.mb: 768.0 ``` +### Warning + +The number of workers will be dynamically calculated by the Resource Aware Scheduler. The `Config.TOPOLOGY_WORKERS` will not be honored. --- End diff -- +1 for the comment from @roshannaik ---
[GitHub] storm pull request #2635: [STORM-3029] don't use keytab based login for hbas...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2635#discussion_r182745958 --- Diff: external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java --- @@ -52,24 +54,27 @@ private HBaseSecurityUtil() { public static UserProvider login(Map conf, Configuration hbaseConfig) throws IOException { //Allowing keytab based login for backward compatibility. -if (UserGroupInformation.isSecurityEnabled() && (conf.get(TOPOLOGY_AUTO_CREDENTIALS) == null || -!(((List) conf.get(TOPOLOGY_AUTO_CREDENTIALS)).contains(AutoHBase.class.getName() { -LOG.info("Logging in using keytab as AutoHBase is not specified for " + TOPOLOGY_AUTO_CREDENTIALS); -//insure that if keytab is used only one login per process executed -if(legacyProvider == null) { -synchronized (HBaseSecurityUtil.class) { -if(legacyProvider == null) { -legacyProvider = UserProvider.instantiate(hbaseConfig); -String keytab = (String) conf.get(STORM_KEYTAB_FILE_KEY); -if (keytab != null) { -hbaseConfig.set(STORM_KEYTAB_FILE_KEY, keytab); -} -String userName = (String) conf.get(STORM_USER_NAME_KEY); -if (userName != null) { -hbaseConfig.set(STORM_USER_NAME_KEY, userName); +if (UserGroupInformation.isSecurityEnabled()) { +List autoCredentials = (List) conf.get(TOPOLOGY_AUTO_CREDENTIALS); +if ((autoCredentials == null) +|| (!autoCredentials.contains(AutoHBase.class.getName()) && !autoCredentials.contains(AutoTGT.class.getName( { +LOG.info("Logging in using keytab as either AutoHBase or AutoTGT is specified for " + TOPOLOGY_AUTO_CREDENTIALS); --- End diff -- nit: I think it should be neither instead of either ---
[GitHub] storm pull request #2630: STORM-3024: Allow for scheduling to happen in the ...
GitHub user revans2 opened a pull request: https://github.com/apache/storm/pull/2630 STORM-3024: Allow for scheduling to happen in the background. You can merge this pull request into a Git repository by running: $ git pull https://github.com/revans2/incubator-storm STORM-3024 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2630.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2630 commit 4a9d0e2a5399647bf541ec16956472d2d795b4b1 Author: Robert (Bobby) Evans Date: 2018-04-10T18:34:20Z STORM-3024: Allow for scheduling to happen in the background. ---
[GitHub] storm issue #2622: STORM-3020: fix possible race condition in AsyncLocalizer
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2622 @HeartSaVioR thanks for the review. I fixed your concerns by making them all ConcurrentHashMaps and adding a note about why they need to be that. I could not find a good way to remove the sideeffects. ---
[GitHub] storm issue #2622: STORM-3020: fix possible race condition in AsyncLocalizer
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2622 @HeartSaVioR Great catch I forgot to update the normal has Map/HashMap to a ConcurrentHashMap. Yes the guarantees of ConcurrentMap allow for retry and we do have side effects in some of the computes. I will update the comments as well to say what requirements we have for the type. The computeIfAbsent methods have no side effects, so we don't need to worry about them as much. I'll see if I can come up with a way to make it so we don't need as strong of a guarantee for `compute` so perhaps we could use other implementations. ---
[GitHub] storm pull request #2623: [STORM-2687] Group Topology executors by network p...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2623#discussion_r179570567 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java --- @@ -477,45 +414,136 @@ protected String nodeToRack(RAS_Node node) { List execsScheduled = new LinkedList<>(); Map> compToExecsToSchedule = new HashMap<>(); -for (Component component : componentMap.values()) { -compToExecsToSchedule.put(component.getId(), new LinkedList()); +for (Map.Entry componentEntry: componentMap.entrySet()) { +Component component = componentEntry.getValue(); +compToExecsToSchedule.put(component.getId(), new LinkedList<>()); for (ExecutorDetails exec : component.getExecs()) { if (unassignedExecutors.contains(exec)) { compToExecsToSchedule.get(component.getId()).add(exec); +LOG.info("{} has unscheduled executor {}", component.getId(), exec); } } } -Set sortedComponents = sortComponents(componentMap); -sortedComponents.addAll(componentMap.values()); +List sortedComponents = topologicalSortComponents(componentMap); -for (Component currComp : sortedComponents) { -Map neighbors = new HashMap(); -for (String compId : Sets.union(currComp.getChildren(), currComp.getParents())) { -neighbors.put(compId, componentMap.get(compId)); +for (Component currComp: sortedComponents) { +int numExecs = compToExecsToSchedule.get(currComp.getId()).size(); +for (int i = 0; i < numExecs; i++) { +execsScheduled.addAll(takeExecutors(currComp, numExecs - i, componentMap, compToExecsToSchedule)); } -Set sortedNeighbors = sortNeighbors(currComp, neighbors); -Queue currCompExesToSched = compToExecsToSchedule.get(currComp.getId()); - -boolean flag = false; -do { -flag = false; -if (!currCompExesToSched.isEmpty()) { -execsScheduled.add(currCompExesToSched.poll()); -flag = true; -} +} + +LOG.info("The ordering result is {}", execsScheduled); + +return execsScheduled; +} -for (Component neighborComp : sortedNeighbors) { -Queue neighborCompExesToSched = -compToExecsToSchedule.get(neighborComp.getId()); -if (!neighborCompExesToSched.isEmpty()) { -execsScheduled.add(neighborCompExesToSched.poll()); -flag = true; +private List takeExecutors(Component currComp, int numExecs, +final Map componentMap, +final Map> compToExecsToSchedule) { +List execsScheduled = new ArrayList<>(); +Queue currQueue = compToExecsToSchedule.get((currComp.getId())); +Set sortedChildren = getSortedChildren(currComp, componentMap); + +execsScheduled.add(currQueue.poll()); --- End diff -- Can `currQueue.poll()` ever return null? How do we handle that if it does? ---
[GitHub] storm pull request #2623: [STORM-2687] Group Topology executors by network p...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2623#discussion_r179568920 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java --- @@ -477,45 +414,136 @@ protected String nodeToRack(RAS_Node node) { List execsScheduled = new LinkedList<>(); Map> compToExecsToSchedule = new HashMap<>(); -for (Component component : componentMap.values()) { -compToExecsToSchedule.put(component.getId(), new LinkedList()); +for (Map.Entry componentEntry: componentMap.entrySet()) { --- End diff -- Nit we only use the Component out of this and never the key, could we go back to just looping through the values like before? ---
[GitHub] storm pull request #2623: [STORM-2687] Group Topology executors by network p...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2623#discussion_r179569824 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java --- @@ -477,45 +414,136 @@ protected String nodeToRack(RAS_Node node) { List execsScheduled = new LinkedList<>(); Map> compToExecsToSchedule = new HashMap<>(); -for (Component component : componentMap.values()) { -compToExecsToSchedule.put(component.getId(), new LinkedList()); +for (Map.Entry componentEntry: componentMap.entrySet()) { +Component component = componentEntry.getValue(); +compToExecsToSchedule.put(component.getId(), new LinkedList<>()); for (ExecutorDetails exec : component.getExecs()) { if (unassignedExecutors.contains(exec)) { compToExecsToSchedule.get(component.getId()).add(exec); +LOG.info("{} has unscheduled executor {}", component.getId(), exec); } } } -Set sortedComponents = sortComponents(componentMap); -sortedComponents.addAll(componentMap.values()); +List sortedComponents = topologicalSortComponents(componentMap); -for (Component currComp : sortedComponents) { -Map neighbors = new HashMap(); -for (String compId : Sets.union(currComp.getChildren(), currComp.getParents())) { -neighbors.put(compId, componentMap.get(compId)); +for (Component currComp: sortedComponents) { +int numExecs = compToExecsToSchedule.get(currComp.getId()).size(); +for (int i = 0; i < numExecs; i++) { +execsScheduled.addAll(takeExecutors(currComp, numExecs - i, componentMap, compToExecsToSchedule)); } -Set sortedNeighbors = sortNeighbors(currComp, neighbors); -Queue currCompExesToSched = compToExecsToSchedule.get(currComp.getId()); - -boolean flag = false; -do { -flag = false; -if (!currCompExesToSched.isEmpty()) { -execsScheduled.add(currCompExesToSched.poll()); -flag = true; -} +} + +LOG.info("The ordering result is {}", execsScheduled); + +return execsScheduled; +} -for (Component neighborComp : sortedNeighbors) { -Queue neighborCompExesToSched = -compToExecsToSchedule.get(neighborComp.getId()); -if (!neighborCompExesToSched.isEmpty()) { -execsScheduled.add(neighborCompExesToSched.poll()); -flag = true; +private List takeExecutors(Component currComp, int numExecs, --- End diff -- Could you add some kind of javadoc to this explaining what it is trying to do? It is not that obvious from just the code alone. ---
[GitHub] storm pull request #2623: [STORM-2687] Group Topology executors by network p...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2623#discussion_r179570190 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java --- @@ -477,45 +414,136 @@ protected String nodeToRack(RAS_Node node) { List execsScheduled = new LinkedList<>(); Map> compToExecsToSchedule = new HashMap<>(); -for (Component component : componentMap.values()) { -compToExecsToSchedule.put(component.getId(), new LinkedList()); +for (Map.Entry componentEntry: componentMap.entrySet()) { +Component component = componentEntry.getValue(); +compToExecsToSchedule.put(component.getId(), new LinkedList<>()); for (ExecutorDetails exec : component.getExecs()) { if (unassignedExecutors.contains(exec)) { compToExecsToSchedule.get(component.getId()).add(exec); +LOG.info("{} has unscheduled executor {}", component.getId(), exec); } } } -Set sortedComponents = sortComponents(componentMap); -sortedComponents.addAll(componentMap.values()); +List sortedComponents = topologicalSortComponents(componentMap); -for (Component currComp : sortedComponents) { -Map neighbors = new HashMap(); -for (String compId : Sets.union(currComp.getChildren(), currComp.getParents())) { -neighbors.put(compId, componentMap.get(compId)); +for (Component currComp: sortedComponents) { +int numExecs = compToExecsToSchedule.get(currComp.getId()).size(); +for (int i = 0; i < numExecs; i++) { +execsScheduled.addAll(takeExecutors(currComp, numExecs - i, componentMap, compToExecsToSchedule)); } -Set sortedNeighbors = sortNeighbors(currComp, neighbors); -Queue currCompExesToSched = compToExecsToSchedule.get(currComp.getId()); - -boolean flag = false; -do { -flag = false; -if (!currCompExesToSched.isEmpty()) { -execsScheduled.add(currCompExesToSched.poll()); -flag = true; -} +} + +LOG.info("The ordering result is {}", execsScheduled); + +return execsScheduled; +} -for (Component neighborComp : sortedNeighbors) { -Queue neighborCompExesToSched = -compToExecsToSchedule.get(neighborComp.getId()); -if (!neighborCompExesToSched.isEmpty()) { -execsScheduled.add(neighborCompExesToSched.poll()); -flag = true; +private List takeExecutors(Component currComp, int numExecs, +final Map componentMap, +final Map> compToExecsToSchedule) { +List execsScheduled = new ArrayList<>(); +Queue currQueue = compToExecsToSchedule.get((currComp.getId())); --- End diff -- nit: there is an extra unneeded pair of '(' and ')' ---
[GitHub] storm pull request #2623: [STORM-2687] Group Topology executors by network p...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2623#discussion_r179569508 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java --- @@ -477,45 +414,136 @@ protected String nodeToRack(RAS_Node node) { List execsScheduled = new LinkedList<>(); Map> compToExecsToSchedule = new HashMap<>(); -for (Component component : componentMap.values()) { -compToExecsToSchedule.put(component.getId(), new LinkedList()); +for (Map.Entry componentEntry: componentMap.entrySet()) { +Component component = componentEntry.getValue(); +compToExecsToSchedule.put(component.getId(), new LinkedList<>()); for (ExecutorDetails exec : component.getExecs()) { if (unassignedExecutors.contains(exec)) { compToExecsToSchedule.get(component.getId()).add(exec); +LOG.info("{} has unscheduled executor {}", component.getId(), exec); } } } -Set sortedComponents = sortComponents(componentMap); -sortedComponents.addAll(componentMap.values()); +List sortedComponents = topologicalSortComponents(componentMap); -for (Component currComp : sortedComponents) { -Map neighbors = new HashMap(); -for (String compId : Sets.union(currComp.getChildren(), currComp.getParents())) { -neighbors.put(compId, componentMap.get(compId)); +for (Component currComp: sortedComponents) { +int numExecs = compToExecsToSchedule.get(currComp.getId()).size(); +for (int i = 0; i < numExecs; i++) { +execsScheduled.addAll(takeExecutors(currComp, numExecs - i, componentMap, compToExecsToSchedule)); } -Set sortedNeighbors = sortNeighbors(currComp, neighbors); -Queue currCompExesToSched = compToExecsToSchedule.get(currComp.getId()); - -boolean flag = false; -do { -flag = false; -if (!currCompExesToSched.isEmpty()) { -execsScheduled.add(currCompExesToSched.poll()); -flag = true; -} +} + +LOG.info("The ordering result is {}", execsScheduled); --- End diff -- Could we remove this too? ---
[GitHub] storm pull request #2623: [STORM-2687] Group Topology executors by network p...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2623#discussion_r179571966 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java --- @@ -477,45 +414,136 @@ protected String nodeToRack(RAS_Node node) { List execsScheduled = new LinkedList<>(); Map> compToExecsToSchedule = new HashMap<>(); -for (Component component : componentMap.values()) { -compToExecsToSchedule.put(component.getId(), new LinkedList()); +for (Map.Entry componentEntry: componentMap.entrySet()) { +Component component = componentEntry.getValue(); +compToExecsToSchedule.put(component.getId(), new LinkedList<>()); for (ExecutorDetails exec : component.getExecs()) { if (unassignedExecutors.contains(exec)) { compToExecsToSchedule.get(component.getId()).add(exec); +LOG.info("{} has unscheduled executor {}", component.getId(), exec); } } } -Set sortedComponents = sortComponents(componentMap); -sortedComponents.addAll(componentMap.values()); +List sortedComponents = topologicalSortComponents(componentMap); -for (Component currComp : sortedComponents) { -Map neighbors = new HashMap(); -for (String compId : Sets.union(currComp.getChildren(), currComp.getParents())) { -neighbors.put(compId, componentMap.get(compId)); +for (Component currComp: sortedComponents) { +int numExecs = compToExecsToSchedule.get(currComp.getId()).size(); +for (int i = 0; i < numExecs; i++) { +execsScheduled.addAll(takeExecutors(currComp, numExecs - i, componentMap, compToExecsToSchedule)); } -Set sortedNeighbors = sortNeighbors(currComp, neighbors); -Queue currCompExesToSched = compToExecsToSchedule.get(currComp.getId()); - -boolean flag = false; -do { -flag = false; -if (!currCompExesToSched.isEmpty()) { -execsScheduled.add(currCompExesToSched.poll()); -flag = true; -} +} + +LOG.info("The ordering result is {}", execsScheduled); + +return execsScheduled; +} -for (Component neighborComp : sortedNeighbors) { -Queue neighborCompExesToSched = -compToExecsToSchedule.get(neighborComp.getId()); -if (!neighborCompExesToSched.isEmpty()) { -execsScheduled.add(neighborCompExesToSched.poll()); -flag = true; +private List takeExecutors(Component currComp, int numExecs, +final Map componentMap, +final Map> compToExecsToSchedule) { +List execsScheduled = new ArrayList<>(); +Queue currQueue = compToExecsToSchedule.get((currComp.getId())); +Set sortedChildren = getSortedChildren(currComp, componentMap); + +execsScheduled.add(currQueue.poll()); + +for (String childId: sortedChildren) { +Component childComponent = componentMap.get(childId); +Queue childQueue = compToExecsToSchedule.get(childId); +int childNumExecs = childQueue.size(); +if (childNumExecs == 0) { +continue; +} +int numExecsToTake = 1; +if (isShuffleFromParentToChild(currComp, childComponent)) { +// if it's shuffle grouping, truncate +numExecsToTake = Math.max(1, childNumExecs / numExecs); +} // otherwise, one-by-one + +for (int i = 0; i < numExecsToTake; i++) { +execsScheduled.addAll(takeExecutors(childComponent, childNumExecs, componentMap, compToExecsToSchedule)); +} +} + +return execsScheduled; +} + +private Set getSortedChildren(Component component, final Map componentMap) { +Set children = component.getChildren(); +Set sortedChildren = +new TreeSet((o1, o2) -> { +Component child1 = componentMap.get(o1); +Component child2 = componentMap.get(o2); +boolean child1IsShuffle = isShuffleFromParentToChild(comp
[GitHub] storm pull request #2623: [STORM-2687] Group Topology executors by network p...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2623#discussion_r179569132 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java --- @@ -477,45 +414,136 @@ protected String nodeToRack(RAS_Node node) { List execsScheduled = new LinkedList<>(); Map> compToExecsToSchedule = new HashMap<>(); -for (Component component : componentMap.values()) { -compToExecsToSchedule.put(component.getId(), new LinkedList()); +for (Map.Entry componentEntry: componentMap.entrySet()) { +Component component = componentEntry.getValue(); +compToExecsToSchedule.put(component.getId(), new LinkedList<>()); for (ExecutorDetails exec : component.getExecs()) { if (unassignedExecutors.contains(exec)) { compToExecsToSchedule.get(component.getId()).add(exec); +LOG.info("{} has unscheduled executor {}", component.getId(), exec); --- End diff -- Could we remove this please? Not sure it is needed anymore. ---
[GitHub] storm issue #2618: [STORM-2905] Fix KeyNotFoundException when kill a storm a...
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2618 OK good I do understand the problem. There really are a few ways that I see we can make the stack trace much less likely to come out in the common case. The following are in my preferred order, but I am open to other ideas. 1) We don't delete the blobs on the nimbus side for a while after we kill the topology. Currently we delete the blobs on a timer that runs every 10 seconds by default, and I would have to trace through things, but I think we may do some other deletions before that happens. If instead we kept a separate map (TOPO_X can be cleaned up after Y) then when cleanup runs it can check that map and if it does not find the topo it wants to clean up, or if it finds it and the time has passed, then it cleans it up. 2) We don't output the stack trace until it has failed some number of times in a row. This would mean that we would still output the error if the blob was deleted when it should not have been, but would not look like an error until it had been gone for 1 or 2 seconds. Hopefully long enough to actually have killed the workers. 3) We have the supervisor inform the AsyncLocalizer about topologies that are in the process of being killed. Right now part of the issue with the race is that killing a worker can take a non-trivial amount of time. This makes the window that the race can happen in much larger. If as soon as the supervisors know that a topology is being killed they tell the AsyncLocalizer it could then not output errors for any topology in the process of being killed. The issue here is that informing the supervisors happens in a background thread and is not guaranteed to happen, so it might not work as frequently as we would like. ---
[GitHub] storm issue #2622: STORM-3020: fix possible race condition in AsyncLocalizer
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2622 Thanks for the review @danny0405 This was not trying to fix STORM-2905. This was a separate race condition I found when reviewing your pull request for STORM-2905. ---
[GitHub] storm issue #2618: [STORM-2905] Fix KeyNotFoundException when kill a storm a...
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2618 @danny0405 I just created #2622 to fix the race condition in AsyncLocalizer. It does conflict a lot with this patch, so I wanted to make sure you saw it and had a chance to give feedback on it. I understand where the exception is coming from, but what I am saying is that putting a synchronize on both cleanup and updateBlobs does not fix the issue. Adding in the synchronize only serves to slow down other parts of the processing. Even controlling the order in which they execute is not enough, because cleanup will only happen after the scheduling change has been fully processed. Perhaps some kind of a message sequence chart would better explain the race here. ![worker being killed](https://user-images.githubusercontent.com/3441321/38334965-7a34ec3e-3822-11e8-88a4-fe1760c0f691.png) The issue is not in the order of cleanup and checking for updates. The race is between nimbus deleting the blobs and the supervisor fully processing the topology being killed. Any time after nimbus deletes the blobs in the blob store until the supervisor has killed the workers and released all references to those blobs we can still get this issue. ![worker being killed bad](https://user-images.githubusercontent.com/3441321/38335209-31d7ea76-3823-11e8-968e-7e5a081f8f73.png) The above sequence is an example of this happening even if we got the ordering right. The only way to "fix" the race is to make it safe to lose the race. The current code will output an exception stack trace when it loses the race. This is not ideal, but it is safe at least as far as I am able to determine. That is why I was asking if the issue is just that we are outputting the stack trace or if there is something else that is happening that is worse than having all of the stack traces? If it is just the stack traces there are things we can do to address them. If it goes beyond that then I still don't understand the issue yet. ---
[GitHub] storm pull request #2622: STORM-3020: fix possible race condition in AsyncLo...
GitHub user revans2 opened a pull request: https://github.com/apache/storm/pull/2622 STORM-3020: fix possible race condition in AsyncLocalizer There were a number of places in AsyncLocalizer that were using synchronized to try and protect some maps. When we added in support for restarting a worker if specific blobs change one of the places that was protected before moved to a background thread and lost that protection. Now that we are java 8 we can use lambdas to provide the same protection without the need for the locks so I removed all of the course grained locking in AsyncLocalizer. Now the only locking is on a per-blob basis. I tested this manually by launching topologies with lots of different types of blobs, modified the blobs and verified that everything worked correctly. I shot things, including the supervisor, various times and verified that it was able to recover in each case. You can merge this pull request into a Git repository by running: $ git pull https://github.com/revans2/incubator-storm STORM-3020 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2622.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2622 commit 337aef8f1291aba0ab228f6e9e0800c19b8c5ceb Author: Robert (Bobby) Evans Date: 2018-04-03T19:58:14Z STORM-3020: fix possible race condition in AsyncLocalizer ---
[GitHub] storm issue #2618: [STORM-2905] Fix KeyNotFoundException when kill a storm a...
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2618 Just FYI I files STORM-3020 to address the race that I just found. ---
[GitHub] storm issue #2618: [STORM-2905] Fix KeyNotFoundException when kill a storm a...
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2618 @danny0405 {{updateBlobs}} does not need to be guarded by a lock. This is what I was talking about with the code being complex. {{requestDownloadBaseTopologyBlobs}} is protected by a lock simply because of this non-thread safe code. https://github.com/apache/storm/blob/402a371ccdb39ccd7146fe9743e91ca36fee6d15/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java#L222-L226 Part of this were written prior to the more to java8 so {{computeIfAbsent}} was not available. Now that it is we could replace it and I believe remove the lock, but I would want to spend some time to be sure it was not accidentally protecting something else in there too. {{requestDownloadTopologyBlobs}} looks like it does not need to be synchronized at all. It must have been a mistake on my part, but it does look like it might be providing some protection to a bug in https://github.com/apache/storm/blob/402a371ccdb39ccd7146fe9743e91ca36fee6d15/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java#L191-L195 Which is executing outside of a lock, but looks to not be thread safe. Declaring {{updateBlobs}} as synchronized does absolutely noting except make it have conflicts with {{requestDownloadTopologyBlobs}} and {{requestDownloadBaseTopologyBlobs}}. And if we are able to remove the locks there, then it will not be an issue at all. {{updateBlobs}} is scheduled using {{scheduleWithFixedDelay}} https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay(java.lang.Runnable,%20long,%20long,%20java.util.concurrent.TimeUnit) The javadocs clearly states that the next execution only starts a fixed delay after the previous one finished. There will only ever be one copy it running at a time. Additionally everything it does is already asynchronous so would be happening on a separate thread. Making it synchronized would just slow things down. The having a blob disappear at the wrong time is a race that will always be in the system and we cannot fix it with synchronization because it is happening on separate servers. The only thing we can do is to deal with it when it happens. The way the current code deals with it is to try again later. This means that a worker that is trying to come up for the first time will not come up until the blob is fully downloaded, but if we are trying to update the blob and it has disappeared we will simply keep the older version around until we don't need it any more. Yes we may log some exceptions while we do it, but that is the worst thing that will happen. ---
[GitHub] storm pull request #2618: [STORM-2905] Fix KeyNotFoundException when kill a ...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2618#discussion_r178852812 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java --- @@ -497,7 +497,6 @@ static DynamicState cleanupCurrentContainer(DynamicState dynamicState, StaticSta assert(dynamicState.container.areAllProcessesDead()); dynamicState.container.cleanUp(); - staticState.localizer.releaseSlotFor(dynamicState.currentAssignment, staticState.port); --- End diff -- That is not true. https://github.com/danny0405/storm/blob/a4e659b5073794396ea23e3dd7b79c00536fc3fe/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java#L505-L511 The code first removes the base blobs reference counts, but then it decrements a reference count for other blobs too. ---
[GitHub] storm issue #2618: [STORM-2905] Fix KeyNotFoundException when kill a storm a...
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2618 I am trying to understand the reasons behind this change. Is this jira just to remove an exception that shows up in the logs? Or is that exception actually causing a problem? The reason I ask is a risk vs reward situation. The code in AsyncLocalizer is really very complicated and because it is asynchronous there are lots of races and corner cases. This makes me a bit nervous to start changing fundamental things just because of some extra logs. Additionally this is a distributed system and this particular race is inherent in the system. It is possible for someone to delete a blob at any point in time and the code in the supervisor needs to handle it. ---
[GitHub] storm pull request #2618: [STORM-2905] Fix KeyNotFoundException when kill a ...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2618#discussion_r178568732 --- Diff: storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java --- @@ -94,17 +93,17 @@ public void cleanup(ClientBlobStore store) { Map.Entry> rsrc = i.next(); LocallyCachedBlob resource = rsrc.getKey(); try { -resource.getRemoteVersion(store); +if (!store.isRemoteBlobExists(resource.getKey())) { --- End diff -- Admittedly the code is cleaner with this, but the change is totally unneeded. It behaves exactly the same as it did before. I think this is a good change, it would just be nice to have it be a separate pull request and a separate JIRA as it is not really a part of the needed fix. ---
[GitHub] storm pull request #2618: [STORM-2905] Fix KeyNotFoundException when kill a ...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2618#discussion_r178567096 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java --- @@ -497,7 +497,6 @@ static DynamicState cleanupCurrentContainer(DynamicState dynamicState, StaticSta assert(dynamicState.container.areAllProcessesDead()); dynamicState.container.cleanUp(); - staticState.localizer.releaseSlotFor(dynamicState.currentAssignment, staticState.port); --- End diff -- cleanupCurrentConatiner gets used in many different locations, not just during the blob update. We need to release the slot when the container is killed or reference counting will be off. ---
[GitHub] storm issue #2591: STORM-2979: WorkerHooks EOFException during run_worker_sh...
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2591 This works and I am OK with merging it in +1. But the implementation of deserializing the hooks twice feels really odd and non-intuitive to me. But this is not your problem so it should be fine. Please do put up a pull request for the master branch too. I think this should probably apply to the other 1.x branches without any changes. ---
[GitHub] storm issue #2604: STORM-3006-updated-DRPC-docs
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2604 The docs are not identical for master, so I will manually make some updates to master, which is allowed without code review. ---
[GitHub] storm issue #2607: STORM-3011 Use default bin path in flight.bash if $JAVA_H...
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2607 @jnioche I was able to cherry-pick #2609 to all of the branches, but I missed this one. Please close this pull request, and sorry for the inconvenience. ---
[GitHub] storm issue #2433: [STORM-2693] Heartbeats and assignments promotion for sto...
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2433 I am +1, but a little nervous that the tests are failing consistently on travis in exactly the same way, but never on my laptop, but I think we can work that out later if it is a real issue. ---
[GitHub] storm issue #2433: [STORM-2693] Heartbeats and assignments promotion for sto...
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2433 @danny0405 the changes look good, the conflicts are minimal and I think the test failures are spurious. I am +1 for merging this in. Please rebase/squash the commits, resolve the minor conflicts and I will be happy to merge it in. I really would like to have someone else give a +1 for the patch too as I made some of the changes myself. ---
[GitHub] storm issue #2433: [STORM-2693] Heartbeats and assignments promotion for sto...
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2433 @danny0405 Sorry about how long this has taken. I am back from vacation now. I will take a look at the patch again, and if the conflicts are small hopefully we can merge it in today or tomorrow. ---
[GitHub] storm issue #2433: [STORM-2693] Heartbeats and assignments promotion for sto...
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2433 @danny0405 I created https://github.com/danny0405/storm/pull/2 to add in authorization support for the supervisor and the new nimbus APIs. ---
[GitHub] storm issue #2433: [STORM-2693] Heartbeats and assignments promotion for sto...
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2433 OK I'll try to put up another pull request to cover basic auth for the supervisor. ---
[GitHub] storm pull request #2433: [STORM-2693] Heartbeats and assignments promotion ...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2433#discussion_r173232562 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4215,7 +4474,48 @@ public boolean isTopologyNameAllowed(String name) throws AuthorizationException, throw new RuntimeException(e); } } - + +@Override +public SupervisorAssignments getSupervisorAssignments(String node) throws AuthorizationException, TException { +try { --- End diff -- Would be good to have some authorization checks here. Not super critical because the data is public, but would be good to tie it into out existing authentication system. ---
[GitHub] storm pull request #2433: [STORM-2693] Heartbeats and assignments promotion ...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2433#discussion_r173232056 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java --- @@ -234,6 +295,60 @@ public void launchDaemon() { } } +private void launchSupervisorThriftServer(Map conf) throws IOException { +// validate port +int port = getThriftServerPort(); +try { +ServerSocket socket = new ServerSocket(port); +socket.close(); +} catch (BindException e) { +LOG.error("{} is not available. Check if another process is already listening on {}", port, port); +throw new RuntimeException(e); +} + +TProcessor processor = new org.apache.storm.generated.Supervisor.Processor( +new org.apache.storm.generated.Supervisor.Iface() { +@Override +public void sendSupervisorAssignments(SupervisorAssignments assignments) +throws AuthorizationException, TException { +LOG.info("Got an assignments from master, will start to sync with assignments: {}", assignments); +SynchronizeAssignments syn = new SynchronizeAssignments(getSupervisor(), assignments, getReadClusterState()); +getEventManger().add(syn); +} + +@Override +public Assignment getLocalAssignmentForStorm(String id) +throws NotAliveException, AuthorizationException, TException { +Assignment assignment = getStormClusterState().assignmentInfo(id, null); +if (null == assignment) { +throw new NotAliveException("No local assignment assigned for storm: " + id + " for node: " + getHostName()); +} +return assignment; +} + +@Override +public void sendSupervisorWorkerHeartbeat(SupervisorWorkerHeartbeat heartbeat) +throws AuthorizationException, TException { +// do nothing now +} --- End diff -- Sorry I forgot this before. All of these must have some kind of authorization checks. We have authenticated the user connecting, but right now anyone with valid Kerberos credentials or a valid WorkerToken can call these APIs. We need something that can block users that should not be calling them, and with the ability to turn it off for a non-secure cluster. `sendSupervisorAssignments` is the biggest security problem. It needs to be restricted to only nimbus making that call. `getLocalAssignmentForStrom` is probably okay to be totally open, but it might be good to restrict it to just the owner of that topology + nimbus. Similar for `sendSupervisorWorkerHeartbeat`. It is a noop right now so not that big of a deal, but in the future I would expect us to want to restrict it. Please take a look at how nimbus is doing these checks. ---
[GitHub] storm pull request #2433: [STORM-2693] Heartbeats and assignments promotion ...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2433#discussion_r173219240 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -319,14 +344,17 @@ private static StormBase make(TopologyStatus status) { private static final TopologyStateTransition REMOVE_TRANSITION = (args, nimbus, topoId, base) -> { LOG.info("Killing topology: {}", topoId); IStormClusterState state = nimbus.getStormClusterState(); +Assignment oldAss = state.assignmentInfo(topoId, null); --- End diff -- Can we rename this variable. It might be offensive to some people. ---
[GitHub] storm pull request #2433: [STORM-2693] Heartbeats and assignments promotion ...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2433#discussion_r173220578 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java --- @@ -293,14 +268,15 @@ public synchronized void run() { } if (hasShared) { localAssignment.set_total_node_shared(amountShared); - } + } --- End diff -- nit: spacing. ---
[GitHub] storm pull request #2433: [STORM-2693] Heartbeats and assignments promotion ...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2433#discussion_r173218011 --- Diff: storm-client/src/jvm/org/apache/storm/utils/SupervisorClient.java --- @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.utils; + +import org.apache.storm.Config; +import org.apache.storm.generated.Supervisor; +import org.apache.storm.security.auth.ThriftClient; +import org.apache.storm.security.auth.ThriftConnectionType; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +public class SupervisorClient extends ThriftClient { +private Supervisor.Client _client; --- End diff -- nit lets follow convention and not use the _ at the beginning. ---
[GitHub] storm pull request #2433: [STORM-2693] Heartbeats and assignments promotion ...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2433#discussion_r173216785 --- Diff: storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java --- @@ -295,7 +295,11 @@ private Scope calculateScope(Map taskToNodePort, Map getHostToRackMapping(Map taskToNodePort) { Set hosts = new HashSet(); for (int task: targetTasks) { -hosts.add(taskToNodePort.get(task).get_node()); +//if this task containing worker will be killed by a assignments sync, +//taskToNodePort will be an empty map which is refreshed by WorkerState +if (taskToNodePort.containsKey(task)) { +hosts.add(taskToNodePort.get(task).get_node()); +} --- End diff -- Nit: could we log an error if it does not contain the task? I just want to be sure that we know something possibly unexpected has happened here. ---
[GitHub] storm pull request #2433: [STORM-2693] Heartbeats and assignments promotion ...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2433#discussion_r173213537 --- Diff: storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java --- @@ -233,23 +249,11 @@ * @return the id of the topology or null if it is not alive. */ default Optional getTopoId(final String topologyName) { --- End diff -- I am a little confused why we need 2 methods that appear to do the exact same thing. Could we try to have just one of them? ---
[GitHub] storm pull request #2433: [STORM-2693] Heartbeats and assignments promotion ...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2433#discussion_r173217779 --- Diff: storm-client/src/jvm/org/apache/storm/utils/SupervisorClient.java --- @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.utils; + +import org.apache.storm.Config; +import org.apache.storm.generated.Supervisor; +import org.apache.storm.security.auth.ThriftClient; +import org.apache.storm.security.auth.ThriftConnectionType; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +public class SupervisorClient extends ThriftClient { +private Supervisor.Client _client; +private static final Logger LOG = LoggerFactory.getLogger(SupervisorClient.class); + +public static SupervisorClient getConfiguredClient(Map conf, String host) { +int port = Integer.parseInt(conf.get(Config.SUPERVISOR_THRIFT_PORT).toString()); +return getConfiguredClientAs(conf, host, port, null); +} + +public static SupervisorClient getConfiguredClient(Map conf, String host, int port) { +return getConfiguredClientAs(conf, host, port, null); +} + +public static SupervisorClient getConfiguredClientAs(Map conf, String host, int port, String asUser) { --- End diff -- I don't think we want to support this right now as we don't have a use case for it. ---
[GitHub] storm pull request #2433: [STORM-2693] Heartbeats and assignments promotion ...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2433#discussion_r173213078 --- Diff: storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java --- @@ -94,6 +108,8 @@ @Deprecated List backpressureTopologies(); +NimbusInfo getLeader(Runnable callback); --- End diff -- I am a little weary of using this vs going directly to the leader elector. The issue really is one of fencing. In some cases it is possible for the leader elector to lose leadership, but the other nodes in ZK to not be updated for a while. I am fine with this API so long as we have the proper javadocs to explain that it cannot be used for fencing and is only for informational purposes. ---
[GitHub] storm pull request #2433: [STORM-2693] Heartbeats and assignments promotion ...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2433#discussion_r173207181 --- Diff: storm-client/src/jvm/org/apache/storm/assignments/InMemoryAssignmentBackend.java --- @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.assignments; + +import java.util.ArrayList; +import java.util.concurrent.ConcurrentHashMap; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.storm.cluster.ClusterUtils; +import org.apache.storm.generated.Assignment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An assignment backend which will keep all assignments and id-info in memory. Only used if no backend is specified internal. + */ +public class InMemoryAssignmentBackend implements ILocalAssignmentsBackend { +private static final Logger LOG = LoggerFactory.getLogger(InMemoryAssignmentBackend.class); + +protected Map idToAssignment; +protected Map idToName; +protected Map nameToId; +private volatile boolean isSynchronized = false; + +public InMemoryAssignmentBackend() {} --- End diff -- nit: you could just delete this line and get the same result. ---
[GitHub] storm pull request #2433: [STORM-2693] Heartbeats and assignments promotion ...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2433#discussion_r173214066 --- Diff: storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java --- @@ -745,6 +828,7 @@ public void disconnect() { stateStorage.unregister(stateId); if (solo) { stateStorage.close(); +this.assignmentsBackend.dispose(); --- End diff -- nit: spacing. ---
[GitHub] storm pull request #2433: [STORM-2693] Heartbeats and assignments promotion ...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2433#discussion_r173210647 --- Diff: storm-client/src/jvm/org/apache/storm/assignments/InMemoryAssignmentBackend.java --- @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.assignments; + +import java.util.ArrayList; +import java.util.concurrent.ConcurrentHashMap; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.storm.cluster.ClusterUtils; +import org.apache.storm.generated.Assignment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An assignment backend which will keep all assignments and id-info in memory. Only used if no backend is specified internal. + */ +public class InMemoryAssignmentBackend implements ILocalAssignmentsBackend { +private static final Logger LOG = LoggerFactory.getLogger(InMemoryAssignmentBackend.class); + +protected Map idToAssignment; +protected Map idToName; +protected Map nameToId; +private volatile boolean isSynchronized = false; + +public InMemoryAssignmentBackend() {} + +@Override +public boolean isSynchronized() { +return this.isSynchronized; +} + +@Override +public void setSynchronized() { +this.isSynchronized = true; +} + +@Override +public void prepare(Map conf) { +// do nothing for conf now +this.idToAssignment = new ConcurrentHashMap<>(); +this.idToName = new ConcurrentHashMap<>(); +this.nameToId = new ConcurrentHashMap<>(); +} + +@Override +public void keepOrUpdateAssignment(String stormId, Assignment assignment) { +this.idToAssignment.put(stormId, assignment); +} + +@Override +public Assignment getAssignment(String stormId) { +return this.idToAssignment.get(stormId); +} + +@Override +public void removeAssignment(String stormId) { +this.idToAssignment.remove(stormId); +} + +@Override +public List assignments() { +if(idToAssignment == null) { +return new ArrayList<>(); +} +List ret = new ArrayList<>(); +ret.addAll(this.idToAssignment.keySet()); +return ret; +} + +@Override +public Map assignmentsInfo() { +Map ret = new HashMap<>(); +ret.putAll(this.idToAssignment); + +return ret; +} + +@Override +public void syncRemoteAssignments(Map remote) { +Map tmp = new ConcurrentHashMap<>(); --- End diff -- The issue I am seeing is that all of this appears to not be thread safe, even with the ConcurrentHashMap. `idToAssignment`, `idToName`, `nameToId` all are set and read in different methods that all appear to be able to be called from different threads. As such there is the real possibility that they state will not be consistent. If we don't care too much about that it might be okay, but if we are going with an eventual consistency like route for this we should at least document it. ---
[GitHub] storm pull request #2433: [STORM-2693] Heartbeats and assignments promotion ...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2433#discussion_r173215759 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java --- @@ -392,6 +401,30 @@ public void establishLogSettingCallback() { workerState.stormClusterState.topologyLogConfig(topologyId, this::checkLogConfigChanged); } +/** + * Send a heartbeat to local supervisor first to check if supervisor is ok for heartbeating. + */ +private void heartbeatToMasterIfLocalbeatFail(LSWorkerHeartbeat lsWorkerHeartbeat) { +if (ConfigUtils.isLocalMode(this.conf)) { +return; +} +//in distributed mode, send heartbeat directly to master if local supervisor goes down +SupervisorWorkerHeartbeat workerHeartbeat = new SupervisorWorkerHeartbeat(lsWorkerHeartbeat.get_topology_id(), +lsWorkerHeartbeat.get_executors(), lsWorkerHeartbeat.get_time_secs()); +try (SupervisorClient client = SupervisorClient.getConfiguredClient(conf, Utils.hostname(), supervisorPort)){ + client.getClient().sendSupervisorWorkerHeartbeat(workerHeartbeat); +} catch (Throwable tr1) { +//if any error/exception thrown, report directly to nimbus. +LOG.warn("Exception when send heartbeat to local supervisor", tr1.getMessage()); +try (NimbusClient nimbusClient = NimbusClient.getConfiguredClient(conf)){ + nimbusClient.getClient().sendSupervisorWorkerHeartbeat(workerHeartbeat); +} catch (Throwable tr2) { --- End diff -- Here too on Exception vs Throwable. ---
[GitHub] storm pull request #2433: [STORM-2693] Heartbeats and assignments promotion ...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2433#discussion_r173215187 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java --- @@ -392,6 +401,30 @@ public void establishLogSettingCallback() { workerState.stormClusterState.topologyLogConfig(topologyId, this::checkLogConfigChanged); } +/** + * Send a heartbeat to local supervisor first to check if supervisor is ok for heartbeating. + */ +private void heartbeatToMasterIfLocalbeatFail(LSWorkerHeartbeat lsWorkerHeartbeat) { +if (ConfigUtils.isLocalMode(this.conf)) { --- End diff -- If you want to make this work in local mode we can do it. For nimbus we have an override in NimbusClient so it will return the local nimbus instead. We could do the same thing for a Supervisor with a specific port number too. That would make local mode look and act a lot more like non-local mode. But it is something we can do in a follow on JIRA. ---
[GitHub] storm pull request #2433: [STORM-2693] Heartbeats and assignments promotion ...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2433#discussion_r173212518 --- Diff: storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java --- @@ -44,6 +44,16 @@ Assignment assignmentInfo(String stormId, Runnable callback); +Assignment remoteAssignmentInfo(String stormId, Runnable callback); --- End diff -- nit: could we get javadocs for the new methods? I know we don't have them for any of the others, but I think it would help long term to know what they APIs are for and which daemon is supposed to call them. This is especially true for the case of stormId, which translates a topology name to a topology id, even though getTopoId already does the same thing? It would be nice to know why that is the case. ---