[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user clockfly commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-82196521 I will ask Ted to close this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user miguno commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-82217610 @clockfly We may also want to check why we don't have the permissions to close this PR ourselves. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user miguno commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-81736347 @d2r And btw I don't have permissions on GitHub to close this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user d2r commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-81733977 @miguno, @tedxia I am scanning through open pull requests: It seems #429 has been merged instead of this pull request and STORM-329 has been resolved. If this is correct, and we no longer need these changes, can we close this pull request? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user clockfly commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-73909944 @miguno, I am sure you are aware that you can still send data to this channel even if channel.isWritable return false. check http://netty.io/4.0/api/io/netty/channel/Channel.html#isWritable() boolean isWritable() Returns true if and only if the I/O thread will perform the requested write operation **immediately**. Any write requests made when this method returns false are **queued** until the I/O thread is ready to process the queued write requests. doing or not doing isWritable check have nothing to do with message loss, the isWritable check is purely used to optimize the performance for small messages. case1: If we have a large enough batch, then we will just flush to netty internal queue. Netty will flush pending data in queueto wire when the wire is not that busy. case2: If we don't have a large enough batch, and the wire is busy, then the Storm netty Client will wait for a while, buffer more messages in the batch, and set a timer to flush later. case3: If we don't have a large enough batch, and the wire is free, then Storm Client will flush immediately, so that we have better latency. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user clockfly commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-73910187 So, I don't think your TODO comment is a issue, it is actually designed like this, how do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user miguno commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-73667341 Many thanks for your review, Sean. I addressed your comments, see the new commits in https://github.com/miguno/storm/commits/0.10.0-SNAPSHOT-STORM-392-miguno-merge. One final question: How should we address the following TODO in `send(IteratorTaskMessage msgs)`? I'd appreciate additional eyeballs on this. :-) It's a scenario that we may or may not have overlooked in the original code. Note how we are NOT checking for a `WRITABLE` channel while flushing (full) message batches (cf. the `while` loop), but we do check for a `WRITABLE` channel when handling any left-over messages (cf. after the `while` loop). ```java /** * Enqueue task messages to be sent to the remote destination (cf. `host` and `port`). */ @Override public synchronized void send(IteratorTaskMessage msgs) { // ...some code removed to shorten this code snippet... Channel channel = channelRef.get(); if (!connectionEstablished(channel)) { // Closing the channel and reconnecting should be done before handling the messages. closeChannelAndReconnect(channel); handleMessagesWhenConnectionIsUnavailable(msgs); return; } // Collect messages into batches (to optimize network throughput), then flush them. while (msgs.hasNext()) { TaskMessage message = msgs.next(); if (messageBatch == null) { messageBatch = new MessageBatch(messageBatchSize); } messageBatch.add(message); // TODO: What shall we do if the channel is not writable? if (messageBatch.isFull()) { MessageBatch toBeFlushed = messageBatch; flushMessages(channel, toBeFlushed); messageBatch = null; } } // Handle any remaining messages in case the last batch was not full. if (containsMessages(messageBatch)) { if (connectionEstablished(channel) channel.isWritable()) { // We can write to the channel, so we flush the remaining messages immediately to minimize latency. pauseBackgroundFlushing(); MessageBatch toBeFlushed = messageBatch; messageBatch = null; flushMessages(channel, toBeFlushed); } else { // We cannot write to the channel, which means Netty's internal write buffer is full. // In this case, we buffer the remaining messages and wait for the next messages to arrive. // // Background: // Netty 3.x maintains an internal write buffer with a high water mark for each channel (default: 64K). // This represents the amount of data waiting to be flushed to operating system buffers. If the // outstanding data exceeds this value then the channel is set to non-writable. When this happens, a // INTEREST_CHANGED channel event is triggered. Netty sets the channel to writable again once the data // has been flushed to the system buffers. // // See http://stackoverflow.com/questions/14049260 resumeBackgroundFlushing(); nextBackgroundFlushTimeMs.set(nowMillis() + flushCheckIntervalMs); } } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user miguno commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-73671537 PS: I just noticed that I put the wrong Storm ticket into the commit https://github.com/miguno/storm/commit/8ebaaf8dbc63df3c2691e0cc3ac5102af7721ec3. The `STORM-327` prefix of the commit message should have been `STORM-329`. I will keep the current code/commits as is in order to not disrupt the ongoing discussion in this thread. Once we're happy with the code I will create a separate pull request, using the correct `STORM-329` ticket reference. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user clockfly commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-73651894 +1 on the update, the patch is well written. I made a few comments under reivew panel of https://github.com/miguno/storm/commit/8ebaaf8dbc63df3c2691e0cc3ac5102af7721ec3#diff-e1bd524877b15ccf409f846e3c95da13R203 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user MaximeDiab commented on a diff in the pull request: https://github.com/apache/storm/pull/268#discussion_r24071455 --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java --- @@ -142,6 +147,15 @@ public void run() { } }; + +connector = new Runnable() { +@Override +public void run() { +if (!closing) { +connect(); +} +} +}; long initialDelay = Math.min(30L * 1000, max_sleep_ms * max_retries); //max wait for 30s --- End diff -- Math.min(arg1, arg2) method returns the smaller of the two arguments. in this case, the maximum delay corresponds to the minimum sent by .min method. The comment is perhaps not accurate, but I think this is not really a problem. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user tedxia commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-72860304 @miguno I test this on my cluster just now. The topology is more simple spout bolt0 bolt1, and each component only have one executor; I did't met the situation what you said, first I kill bolt1, and after 2 second bolt0 know bolt1 died and close the connection. And then after 31s, bolt0 start connect to another worker that contain bolt1. After bolt0 connect to new bolt1, new bolt1 receive immediately, I see this through ui acked number; 1 First I kill bolt1 at 21:48:07; 2 bolt0 know bolt1 died at 21:48:08 ``` 2015-02-04 21:48:08 b.s.m.n.StormClientErrorHandler [INFO] Connection failed Netty-Client-lg-hadoop-tst-st04.bj/10.2.201.70:42813 java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcher.read0(Native Method) ~[na:1.6.0_37] at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21) ~[na:1.6.0_37] at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198) ~[na:1.6.0_37] at sun.nio.ch.IOUtil.read(IOUtil.java:166) ~[na:1.6.0_37] at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:245) ~[na:1.6.0_37] at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:322) ~[netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:281) ~[netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:201) ~[netty-3.2.2.Final.jar:na] at org.jboss.netty.util.internal.IoWorkerRunnable.run(IoWorkerRunnable.java:46) [netty-3.2.2.Final.jar:na] at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) [na:1.6.0_37] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) [na:1.6.0_37] at java.lang.Thread.run(Thread.java:662) [na:1.6.0_37] 2015-02-04 21:48:08 b.s.m.n.Client [INFO] failed to send requests to lg-hadoop-tst-st04.bj/10.2.201.70:42813: java.nio.channels.ClosedChannelException: null at org.jboss.netty.channel.socket.nio.NioWorker.cleanUpWriteBuffer(NioWorker.java:629) [netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.socket.nio.NioWorker.close(NioWorker.java:605) [netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:356) [netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:281) [netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:201) [netty-3.2.2.Final.jar:na] at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) [netty-3.2.2.Final.jar:na] at org.jboss.netty.util.internal.IoWorkerRunnable.run(IoWorkerRunnable.java:46) [netty-3.2.2.Final.jar:na] at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) [na:1.6.0_37] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) [na:1.6.0_37] at java.lang.Thread.run(Thread.java:662) [na:1.6.0_37] ``` Then bolt0 start reconnect to bolt1, and stop send message to bolt1, ``` 2015-02-04 21:48:08 b.s.m.n.Client [ERROR] The Connection channel currently is not available, dropping pending 1 messages... 2015-02-04 21:48:08 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-lg-hadoop-tst-st04.bj/10.2.201.70:42813... [0] 2015-02-04 21:48:08 b.s.m.n.Client [INFO] failed to send requests to lg-hadoop-tst-st04.bj/10.2.201.70:42813: java.nio.channels.ClosedChannelException: null at org.jboss.netty.channel.socket.nio.NioWorker.cleanUpWriteBuffer(NioWorker.java:629) [netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.socket.nio.NioWorker.close(NioWorker.java:605) [netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:356) [netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:281) [netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:201) [netty-3.2.2.Final.jar:na] at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) [netty-3.2.2.Final.jar:na] at org.jboss.netty.util.internal.IoWorkerRunnable.run(IoWorkerRunnable.java:46) [netty-3.2.2.Final.jar:na] at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) [na:1.6.0_37] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) [na:1.6.0_37] at
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user miguno commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-72653588 @tedxia (or @clockfly): Have you experienced similar Storm behavior as [I described above](https://github.com/apache/storm/pull/268#issuecomment-72652704) in your patched production cluster? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user miguno commented on a diff in the pull request: https://github.com/apache/storm/pull/268#discussion_r24039811 --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java --- @@ -142,6 +147,15 @@ public void run() { } }; + +connector = new Runnable() { +@Override +public void run() { +if (!closing) { +connect(); +} +} +}; long initialDelay = Math.min(30L * 1000, max_sleep_ms * max_retries); //max wait for 30s --- End diff -- Either the comment or the code are incorrect. If we want to wait a maximum of 30s, then we should use `Math.max(...)`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user fmazoyer commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-71063105 Hello, we recently stumbled upon the issue STORM-404 in storm 0.9.3. I was just curious if some more work was planned to fix that guy? Or just, could the work already done for the issue 404 be summed up in some way? Thanks a lot for your help :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user clockfly commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-67047625 @nathanmarz , I'd like to explain why I need to change worker.clj. This was also motivated by a legacy TODO in in zmq.clj. https://github.com/nathanmarz/storm/blob/0.8.2/src/clj/backtype/storm/messaging/zmq.clj#L43 ``` (send [this task message] ... (mq/send socket message ZMQ/NOBLOCK)) ;; TODO: how to do backpressure if doing noblock?... need to only unblock if the target disappears ``` As we can see, zeromq transport will send message in non-blocking way. If I understand this TODO correctly, it wants, a) When target worker is not booted yet, the source worker should not send message to target. Otherwise, as there is no backpressure, there will be message loss during the bootup phase. If it is un unacked topology, the message loss is permanent; if it is an acked topology, we will need to do unnecessary replay. b) When target worker disappears in the middle(crash?), the source worker should drop the messages directly. The problem is that: transport layer don't know by itself whether the target worker is booting up or crashed in the running phase, so it cannot smartly choose between back pressure or drop. If the transport simplifiy choose block, it is good for booting up phase, but bad for running phase. If one connection is down, it may block messages sent to other connections. If the transport simplify choose drop, it is good for running phase, but bad for booting up phase. If the target worker is booted 30 seconds later, all message between this 30 seconds will be dropped. The changes in worker.clj is targeted to solve this problem. Worker knows when the target worker connections are ready. In the bootup phase, worker.clj will wait target worker connection is ready, then it will activate the source worker tasks. In the âruntime phase, the transport will simply drop the messages if target worker crashed in the middle. There will be several benefits: 1. During cluster bootup, for unacked topology, there will be no strange message loss. 2. During cluster bootup, for acked topology, it can take less time to reach the normal throughput, as there is no message loss, timeout, and replay. 3. For transport layer, the design is simplified. We can just drop the messages if target worker is not available. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user tedxia commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-66948950 @ptgoetz ,I have merged master branch, and all test success. @clockfly , Will you explain the change to worker.clj, thank you. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user clockfly commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-66620738 @tedxia Thanks, I think this will work. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user clockfly commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-66620748 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user tedxia commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-66420678 @clockfly threadpool.schedule() is't a blocking operation, as ``` In particular, because it acts as a fixed-sized pool using corePoolSize threads and an unbounded queue, adjustments to maximumPoolSize have no useful effect. ``` For more information, please refer [ScheduledThreadPoolExecutor](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ScheduledThreadPoolExecutor.html) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user tedxia commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-66422581 I tested this on our cluster, and the result are below: Before remove sleep in connect ![storm](https://cloud.githubusercontent.com/assets/8066671/5373174/d4364ffc-808d-11e4-8641-8647ca4d3581.png) After remove sleep in connect ![stormx1](https://cloud.githubusercontent.com/assets/8066671/5373187/ed25ac7e-808d-11e4-99ce-b63e0d5ea399.png) After remove sleep in connect, we got more than 100% speed increase, the most important thing is that failed Client will never bind send message to other worker anymore; @clockfly will you test this on your cluster and give some advise. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user clockfly commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-65763659 Thanks Tedï¼ Yes, we need a fine-grained lock at âsynchronrized connect() â. I see you changed it to use schedule Runnable, it may cause deadlock, as schedule is a blocking operation. connect() wait for threadpool.schedule() to release slots, threadpool.schedule() wait for connect() to exit Looking forward to your update on this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user clockfly commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-63443152 It seems common in storm UT to have random failures. We may need to clean Storm UT to make it faster and more robust. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user tedxia commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-63424599 @ptgoetz I have merge storm/master to this patch and all tests pass except storm-core/test/clj/backtype/storm/multilang_test.clj, test results are(after I removed multilang_test.clj): ``` [INFO] Reactor Summary: [INFO] [INFO] Storm . SUCCESS [1.340s] [INFO] maven-shade-clojure-transformer ... SUCCESS [1.712s] [INFO] Storm Core SUCCESS [6:04.103s] [INFO] storm-starter . SUCCESS [7.010s] [INFO] storm-kafka ... SUCCESS [1:03.183s] [INFO] storm-hdfs SUCCESS [2.056s] [INFO] storm-hbase ... SUCCESS [2.186s] [INFO] Storm Binary Distribution . SUCCESS [0.185s] [INFO] Storm Source Distribution . SUCCESS [0.136s] ``` When I run multilang_test.clj, I got exception like this: ``` java.lang.Exception: Shell Process Exception: Exception in bolt: \xE4 on US-ASCII - /usr/lib/ruby/1.9.1/json/common.rb:148:in `encode'\n/usr/lib/ruby/1.9.1/json/common.rb:148:in `initialize'\n/usr/lib/ruby/1.9.1/json/common.rb:148:in `new'\n/usr/lib/ruby/1.9.1/json/common.rb:148:in `parse'\n/tmp/81b49de0-4ee0-493a-afe0-6286e393fb14/supervisor/stormdist/test-1-1416288370/resources/storm.rb:39:in `read_message'\n/tmp/81b49de0-4ee0-493a-afe0-6286e393fb14/supervisor/stormdist/test-1-1416288370/resources/storm.rb:57:in `read_command'\n/tmp/81b49de0-4ee0-493a-afe0-6286e393fb14/supervisor/stormdist/test-1-1416288370/resources/storm.rb:190:in `run'\ntester_bolt.rb:37:in `main' at backtype.storm.task.ShellBolt.handleError(ShellBolt.java:188) [classes/:na] at backtype.storm.task.ShellBolt.access$1100(ShellBolt.java:69) [classes/:na] at backtype.storm.task.ShellBolt$BoltReaderRunnable.run(ShellBolt.java:331) [classes/:na] at java.lang.Thread.run(Thread.java:662) [na:1.6.0_37] 124526 [Thread-1209] ERROR backtype.storm.task.ShellBolt - Halting process: ShellBolt died. java.lang.RuntimeException: backtype.storm.multilang.NoOutputException: Pipe to subprocess seems to be broken! No output read. Serializer Exception: at backtype.storm.utils.ShellProcess.readShellMsg(ShellProcess.java:101) ~[classes/:na] at backtype.storm.task.ShellBolt$BoltReaderRunnable.run(ShellBolt.java:318) ~[classes/:na] at java.lang.Thread.run(Thread.java:662) [na:1.6.0_37] 124527 [Thread-1209] ERROR backtype.storm.daemon.executor - java.lang.RuntimeException: backtype.storm.multilang.NoOutputException: Pipe to subprocess seems to be broken! No output read. Serializer Exception: at backtype.storm.utils.ShellProcess.readShellMsg(ShellProcess.java:101) ~[classes/:na] at backtype.storm.task.ShellBolt$BoltReaderRunnable.run(ShellBolt.java:318) ~[classes/:na] at java.lang.Thread.run(Thread.java:662) [na:1.6.0_37] ``` When I run multilang_test.clj at storm/master, I got the same exception, I think this may be my personal environment problem, can you merge this to 0.9.3 branch and run tests again, thanks a lot. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user tedxia commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-63430061 ``` I tested this by applying it to the 0.9.3 branch and found problems with the unit tests (never-ending cycle of zookeeper reconnects, tests never complete). ``` @ptgoetz Can you figure out which test cause this, I will check it again. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user ptgoetz commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-63127132 I tested this by applying it to the 0.9.3 branch and found problems with the unit tests (never-ending cycle of zookeeper reconnects, tests never complete). @tedxia @clockfly Could you guys take a look and perhaps up merge against the 0.9.3-branch branch? I might have made a mistake during conflict resolution. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user tedxia commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-62516293 @clockfly --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user clockfly commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-62509705 @tedxia The thread pool size of clientScheduleService is decided by worker number (also =1 and = 10). For example, if there are 2 worker, the pool size is 1, if worker number is 4, then the pool size is 3. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user tedxia commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-61803120 ``` High Availability test test scenario: 4 machine A,B,C,D, 4 worker, 1 worker on each machine test case1(STORM-404): on machine A, kill worker. A will create a new worker taking the same port. ``` @clockfly ,in your case, I have some question about your case, storm scheduler will escape schedule a new worker at the same ip:port after a worker crash. And if storm schedule not schedule new worker as i said, in you test case2, the scheduler will schedule new worker on the same ip:port continuously, the behavior will not change as you occupy the port. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user HeartSaVioR commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-61404264 @clockfly Sorry I'm bothering you, but how did you check lost tuples from storm-perf-test? Lost tuples should be taken care of seriously, so I think we should separate this from current issue, and rollback STORM-350 immediately before releasing 0.9.3-rc2 if it's always reproducible. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user clockfly commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-61410622 Hi HeartSaVioR, For acked topology, there is at least once delivery gurantee. When a tuple is dropped, the tuple cached at spout side will timeout, and it will report a failure to storm UI, it will be shown as failed count. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user HeartSaVioR commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-61411420 @clockfly OK, actually I did setup whole test environment using 0.9.3-rc1, but I ran perf test topology too short so maybe I can't see any problem. I'll test again and let you know. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user clockfly commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-61373371 I traced back the bug of message loss. I found this issue is introduced by storm-350. (https://github.com/apache/storm/pull/134/files#diff-4cbcb6fa47274d6cf66d70585f98cbefR202) After upgrading from disruptor 2.10.1 to 3.2.1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user clockfly commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-61392411 When I reverted STORM-350, and test it again. There is no more message loss. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user clockfly commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-61393103 High Availability test === test scenario: 4 machine A,B,C,D, 4 worker, 1 worker on each machine test case1(STORM-404): on machine A, kill worker. A will create a new worker taking the same port. expected result: reconnection will succeed. experiment result: other worker will start to reconnect, eventually it succeed. Because A starts a new worker with same port. ``` 2014-11-02T09:31:24.988+0800 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-IDHV22-04/192.168.1.54:6703... [84] 2014-11-02T09:31:25.498+0800 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-IDHV22-04/192.168.1.54:6703... [85] 2014-11-02T09:31:25.498+0800 b.s.m.n.Client [INFO] connection established to a remote host Netty-Client-IDHV22-04/192.168.1.54:6703, [id: 0x54466bab, /192.168.1.51:51336 = IDHV22-04/192.168.1.54:6703] ``` test case2(STORM-404): on machine A, kill worker, then immediately start a process to occupy the port used by the worker, which will force storm to relocate the worker to a new port(or a new machine.) -- expected result: reconnection process will fail, becasue storm relocate the worker to a new port. Actual result: First after many reconnecton try, the reconnection is aborted, no exception thrown ``` 2014-11-02T09:31:14.753+0800 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-IDHV22-04/192.168.1.54:6703... [63] 2014-11-02T09:31:18.065+0800 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-IDHV22-04/192.168.1.54:6703... [70] at org.apache.storm.netty.handler.codec.oneone.OneToOneEncoder.doEncode(OneToOneEncoder.java:71) ~[storm-core-0.9.3-rc2-SNAPSHOT.jar:0.9.3-rc2-SNAPSHOT] ... 2014-11-02T09:45:36.209+0800 b.s.m.n.Client [INFO] Waiting for pending batchs to be sent with Netty-Client-IDHV22-04/192.168.1.54:6703..., timeout: 60ms, pendings: 0 2014-11-02T09:45:36.209+0800 b.s.m.n.Client [INFO] connection is closing, abort reconnecting... ``` Second, a new connection to new worker(with new port, or on another machine) (previous the worker is at IDHV22-04:6703, then relocate to IDHV22-03:6702) ``` 2014-11-02T09:45:36.206+0800 b.s.m.n.Client [INFO] New Netty Client, connect to IDHV22-03, 6702, config: , buffer_size: 5242880 2014-11-02T09:45:36.207+0800 b.s.m.n.Client [INFO] connection established to a remote host Netty-Client-IDHV22-03/192.168.1.53:6702, [id: 0x538fdacb, /192.168.1.51:56047 = IDHV22-03/192.168.1.53:6702] ``` test case3: check the failed message count before and after the worker crash expect result: after the worker crash, there will some message loss. After it stablize, the message loss will not increase. Actual result: meet expectation. test case4: check the throughput change before and after the worker crash -- expect result: There should be no performance drop. Actual result: When storm start a new worker on same machine, there is no performance drop. Check the first gap in the following image. ![network bandwidth change before and after worker crash](https://issues.apache.org/jira/secure/attachment/12678758/worker-kill-recover3.jpg) When storm start a new worker on different machine. It may impact the parallism. Check the second gap in above picture. Before worker crash, there are 4 worker on 4 machine. After worker crash, there are 3 worker on 4 machine. The parallism drops, so the throughput drops. test case5(STORM-510): when a target worker crash, the message sending to other workers should not be blocked. expect result: One connection should not block another in the case of worker crash. Actual result: In the code, the blocking logic is removed. So, one connection will not block another connection. However, in the transition period of failure, because there will be many message loss to the crashed worker, the max.spout.pending flow control may kicks in, the spout message sending speed will be slower. And overall the max throughput will be smaller. After the transition, it goes back to normal. In my test, the transition peroid is around 40second. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user clockfly commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-61393801 Summary of the test: UT pass STORM-404 pass STORM-510 pass Performacne regression pass --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user clockfly commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-61393817 Hi Ted, Can you try this on your live cluster and contribute some real case test result? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user clockfly commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-61286844 About performance test: === I tested the performance of new patch. It has no sigificant difference with storm-0.92. About STORM-404 chained crash issue(one worker cause another worker to crash) With this patch, the reconnection is successfully aborted. And new connection is established. ``` 2014-10-31T23:00:11.738+0800 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-IDHV22-04/192.168.1.54:6703... [30] 2014-10-31T23:00:12.738+0800 b.s.m.n.Client [INFO] Closing Netty Client Netty-Client-IDHV22-04/192.168.1.54:6703 2014-10-31T23:00:12.739+0800 b.s.m.n.Client [INFO] Waiting for pending batchs to be sent with Netty-Client-IDHV22-04/192.168.1.54:6703..., timeout: 60ms, pendings: 0 2014-10-31T23:00:32.754+0800 o.a.s.c.r.ExponentialBackoffRetry [WARN] maxRetries too large (30). Pinning to 29 2014-10-31T23:00:32.754+0800 b.s.u.StormBoundedExponentialBackoffRetry [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries [30] 2014-10-31T23:00:32.754+0800 b.s.m.n.Client [INFO] New Netty Client, connect to IDHV22-01, 6702, config: , buffer_size: 5242880 2014-10-31T23:00:32.754+0800 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-IDHV22-01/192.168.1.51:6702... [0] 2014-10-31T23:00:32.755+0800 b.s.m.n.Client [INFO] connection established to a remote host Netty-Client-IDHV22-01/192.168.1.51:6702, [id: 0x4f7eb44b, /192.168.1.51:56592 = IDHV22-01/192.168.1.51:6702] ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user clockfly commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-61287291 During this test, I found there was message loss, but it was not caused by this new patch. I traced back, seems the fault is introduced with changes between 0.9.2 and 0.9.3-rc1. I am still trying to find the root cause for this error. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user clockfly commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-60870953 @tedxia I got a chance to chat with Ted online. In summary, he is descrbing the following case ï¼worker A - worker B): 1. B dies 2. after zk session timeout, zk knows B is dead 3. A is initiating the reconnection process to B. By default, it will retry 300 times at max.(it should be larger than 120second, based on the comments in config) â ``` # Since nimbus.task.launch.secs and supervisor.worker.start.timeout.secs are 120, other workers should also wait at least that long before giving up on connecting to the other worker.```â 4. zk is under heavy load(consider a zk tree which have 100 thoudsands nodes, and many many watchers), may take minutes to notify A that B is dead. 5. A didn't get notification from zk in time after 300 connection retries, reconnection failedï¼ it throws, which will cause the worker to exit. Basically there are two questions asked. First, whether we can assure the zookeeper is responsive( 1minute). Second, If worker doesn't get update of B from zookeeper after 300 reconnection retries, should we exit the worker or let worker continues to work? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user clockfly commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-60871259 ``` If worker doesn't get update of B from zookeeper after 300 reconnection retries, should we exit the worker or let worker continues to work? ``` Current approach is: - if worker A get update of B from zk, it will abort the reconnection. and worker A will still stay alive and working. - if A doesn't get update from zk that B is dead, after timeout of 300 reconnection retries. then A will exit. In my opinion, there is no way to recover but exit. because a) A belive it must have live connection to B because application tell it that b) A cannot setup connection to B after exhausting every effort. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user clockfly commented on a diff in the pull request: https://github.com/apache/storm/pull/268#discussion_r19519682 --- Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj --- @@ -378,9 +392,15 @@ _ (refresh-connections nil) _ (refresh-storm-active worker nil) + +receive-thread-shutdown (launch-receive-thread worker) + +;; make sure all messaging connections are ready for sending data, the netty messaging +;; client will drop the messages if client.send(msg) is called before the connection +;; is established. +_ (wait-messaging-connections-to-be-ready worker) --- End diff -- The wait here is to solve a corner case. When the topology boots up. Previsous approach will start spout immediately, which will use the IConnection layer before the IConnection is available. The wait-messaging-connections-to-be-ready is to ensure that the connection layer is ready before starting spout. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user clockfly commented on a diff in the pull request: https://github.com/apache/storm/pull/268#discussion_r19065812 --- Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj --- @@ -330,6 +330,20 @@ (.send drainer node+port-socket))) (.clear drainer)) +;; Check whether this messaging connection is ready to send data +(defn is-connection-ready [^IConnection connection] + (if (instance? ConnectionWithStatus connection) --- End diff -- Here we have a type check (instance?), which may not be clean. To change IConnection may be cleaner, but that may bring compatible issues(support the user already implemented some messaging with current IConnection declaration) If the messaging connection has extended ConnectionWithStatus, then we will check the status. If the messaging connection has NOT extended ConnectionWithStatus, then we behavior in the same as before. (We will skip this check) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user clockfly commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-59487520 Support we are sending data from worker A to worker B, to solve STORM-404(Worker on one machine crashes due to a failure of another worker on another machine), I think we can adopt the following logics: case1: when B is down: 1. B is lost, but A is still belive B is alive. 2. A try to send data to B, and then it triggers reconnect 3. The Nimbus find B is lost, and notify A. 4. A got notification that B is down, it will need to interrupt the reconnection of step 2( by closing the connection) 5. The reconnection of step 2 is interuppted, it exit. it will not throw RuntimeException. The key change is at step 4. A need to interrupt the reconnection to an obsolete worker. case 2: when B is alive, but the connection from A to B is down 1. A trigger reconnection logic 2. reconnection timeout 3. A cannot handle this failure, A throws RuntimeException. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user HeartSaVioR commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-59529389 I have a question (maybe comment) about your PR. (Since I don't know Storm deeply, so it could be wrong. Please correct me if I'm wrong!) When we enqueue tuples to Client, queued tuples seems to be discarded when one worker is down and nimbus reassigns task to other worker, and finally worker changes task-socket relation. But if we enqueue tuples to Drainer, queued tuples may could be sent to new worker when task - socket cache is changed to new. If I'm right, it would be better to place flusher into TransferDrainer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user harshach commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-58608071 @tedxia is it possible to add some specific unit tests for these changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user tedxia commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-58608389 ok, I will add some specific unit tests later, thank you. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user nathanmarz commented on a diff in the pull request: https://github.com/apache/storm/pull/268#discussion_r18687511 --- Diff: storm-core/src/clj/backtype/storm/messaging/local.clj --- @@ -45,10 +46,10 @@ (let [send-queue (add-queue! queues-map lock storm-id port)] (.put send-queue (TaskMessage. taskId payload)) )) - (^void send [this ^Iterator iter] + (^void send [this ^ArrayList msgs] (let [send-queue (add-queue! queues-map lock storm-id port)] - (while (.hasNext iter) - (.put send-queue (.next iter))) + (fast-list-iter [task msgs] +(.put send-queue (TaskMessage. (.task task) (.message task --- End diff -- This needs type hints to avoid reflection. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user nathanmarz commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-58609408 -1 You need to explain these changes more, especially the changes to worker.clj --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user nathanmarz commented on a diff in the pull request: https://github.com/apache/storm/pull/268#discussion_r18687520 --- Diff: storm-core/src/jvm/backtype/storm/messaging/IConnection.java --- @@ -40,7 +41,7 @@ * @param msgs */ -public void send(IteratorTaskMessage msgs); +public void send(ArrayListTaskMessage msgs); --- End diff -- Why the change? And if it does need to be changed, it shouldn't be specific to ArrayList but should be the least restrictive interface possible. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user tedxia commented on a diff in the pull request: https://github.com/apache/storm/pull/268#discussion_r18688234 --- Diff: storm-core/src/jvm/backtype/storm/messaging/IConnection.java --- @@ -40,7 +41,7 @@ * @param msgs */ -public void send(IteratorTaskMessage msgs); +public void send(ArrayListTaskMessage msgs); --- End diff -- Actually, this don't need change, I will change it back. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-329 : buffer message in client and recon...
Github user tedxia commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-58614296 In this patch, Client have TimeCacheMap for cache new messages that has't been send out. Worker only need add message to corresponding Client, and Client will decide when to send the new arrival message. At most time Client will send new arrival message as soon as possible, but when the connect had been lost or there are older message. So I removed send in worker.clj. For Client, there is a State machine stand for Client's current state, such as NEW, CONNECTED, DISCONNECTED and CLOSED. At first Client' state is NEW, then we will connect remote worker by start a timer that run connect logic (we call it connector) immediately. If connect success, Client't state will become CONNECTED, connector will start a periodic timer for send message to remote worker (we call it flusher). If connect failed, it will retry a few times until success or Client's state become CLOSED. When Client's state become CLOSED, Client will be destoried. Flusher will send cached message periodically, and when flush failed Client's state will become DISCONNECTED and start connector immediately. For reduce message transfer delay, when some message arrival, we will start flusher immediately, and of course flusher only work when Client's state are CONNECTED. That it the main change for this patch. I am very sorry for all of this. As a new man for opensource, i will try my best to do better. Thanks all. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---