Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/1417#discussion_r79412277
--- Diff: storm-core/src/jvm/org/apache/storm/messaging/local/Context.java
---
@@ -82,31 +86,73 @@ public void close() {
private static class LocalClient implements IConnection {
private final LocalServer _server;
+ //Messages sent before the server registered a callback
+ private final LinkedBlockingQueue<TaskMessage>
_pendingDueToUnregisteredServer;
+ private final ScheduledExecutorService _pendingFlusher;
public LocalClient(LocalServer server) {
_server = server;
+ _pendingDueToUnregisteredServer = new LinkedBlockingQueue<>();
+ _pendingFlusher = Executors.newScheduledThreadPool(1, new
ThreadFactory(){
+ @Override
+ public Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(runnable);
+ thread.setName("LocalClientFlusher-" + thread.getId());
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
+ _pendingFlusher.scheduleAtFixedRate(new Runnable(){
+ @Override
+ public void run(){
+ try {
+ //Ensure messages are flushed even if no more
sends are performed
+ flushPending();
+ } catch (Throwable t) {
+ LOG.error("Uncaught throwable in pending message
flusher thread, messages may be lost", t);
+ throw t;
+ }
+ }
+ }, 5, 5, TimeUnit.SECONDS);
}
@Override
public void registerRecv(IConnectionCallback cb) {
throw new IllegalArgumentException("SHOULD NOT HAPPEN");
}
-
+
+ private void flushPending(){
+ if (_server._cb != null &&
!_pendingDueToUnregisteredServer.isEmpty()) {
+ ArrayList<TaskMessage> ret = new ArrayList<>();
+ _pendingDueToUnregisteredServer.drainTo(ret);
+ _server._cb.recv(ret);
--- End diff --
If _cb is marked as volatile, does that means we expect it to change at any
point in time? If so then the null check is not enough. We should cache it
locally before doing the null check and the call (everywhere).
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---