[ 
https://issues.apache.org/jira/browse/STORM-2175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15616279#comment-15616279
 ] 

Robert Joseph Evans commented on STORM-2175:
--------------------------------------------

OK I was looking through the logs and found what happened (and honestly I think 
it is actually a good thing).

When shutting down a local mode cluster inside testing.clj we have.

{code}
  (doseq [s @(:supervisors cluster-map)]
    (.shutdownAllWorkers s)
    ;; race condition here? will it launch the workers again?
    (.close s))
  (ProcessSimulator/killAllProcesses)
{code}

NOTE: the comment above is not what is causing this issue.

So all of the supervisors are shut down, first by killing all of the worker 
processes and then by closing the supervisor.
After all of the supervisors are shut down, just to be sure, we then kill all 
of the processes still registered with the process simulator.

The code for killing all of the workers in a supervisor is the following...

{code}
    public synchronized void shutdownAllWorkers() {
        for (Slot slot: slots.values()) {
            slot.setNewAssignment(null);
        }

        for (Slot slot: slots.values()) {
            try {
                int count = 0;
                while (slot.getMachineState() != MachineState.EMPTY) {
                    if (count > 10) {
                        LOG.warn("DONE waiting for {} to finish {}", slot, 
slot.getMachineState());
                        break;
                    }
                    if (Time.isSimulating()) {
                        Time.advanceTime(1000);
                        Thread.sleep(100);
                    } else {
                        Time.sleep(100);
                    }
                    count++;
                }
            } catch (Exception e) {
                LOG.error("Error trying to shutdown workers in {}", slot, e);
            }
        }
    }
{code}

It tells the Slot that it is not assigned anything any more and waits for it to 
kill the worker under it.  I saw in the logs that for the one worker in 
question it timed out ("DONE waiting for...") and went on to kill/shut down 
other things. 

The ProcessSimulator code to kill local mode worker is.

{code}
    /**
     * Kill a process
     *
     * @param pid
     */
    public static void killProcess(String pid) {
        synchronized (lock) {
            LOG.info("Begin killing process " + pid);
            Shutdownable shutdownHandle = processMap.get(pid);
            if (shutdownHandle != null) {
                shutdownHandle.shutdown();
            }
            processMap.remove(pid);
            LOG.info("Successfully killed process " + pid);
        }
    }

    /**
     * Kill all processes
     */
    public static void killAllProcesses() {
        Set<String> pids = processMap.keySet();
        for (String pid : pids) {
            killProcess(pid);
        }
    }
{code}

Inside the logs I don't see a corresponding "Successfully killed process " for 
the "Begin killing process ".  This means that an exception was thrown during 
the shutdown process.  Looking at the code the only exception that could have 
caused this is an InterruptedException or something caused by it (or else the 
entire processes would have exited).

This means that the ProcessSimulator partially shut down the worker, then the 
worker threw the exception, and ProcessSimulator failed to remove the worker 
from the map.  That way when the follow on code to kill anything registered 
with the process simulator that might have leaked is called it ends up trying 
to shoot the process yet again, which the code we found does not like.

Everything seems perfectly reasonable, except for the part where some of our 
data structures don't like to be shut down multiple times.  This seems against 
what most other software does.  Exactly once is hard so lets make sure at least 
once works.

> Supervisor V2 can possibly shut down workers twice in local mode
> ----------------------------------------------------------------
>
>                 Key: STORM-2175
>                 URL: https://issues.apache.org/jira/browse/STORM-2175
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-core
>    Affects Versions: 2.0.0, 1.1.0
>            Reporter: Robert Joseph Evans
>            Assignee: Robert Joseph Evans
>
> See https://github.com/apache/storm/pull/1697#issuecomment-256456889
> {code}
> java.lang.NullPointerException
>     at 
> org.apache.storm.utils.DisruptorQueue$FlusherPool.stop(DisruptorQueue.java:110)
>     at 
> org.apache.storm.utils.DisruptorQueue$Flusher.close(DisruptorQueue.java:293)
>     at 
> org.apache.storm.utils.DisruptorQueue.haltWithInterrupt(DisruptorQueue.java:410)
>     at 
> org.apache.storm.disruptor$halt_with_interrupt_BANG_.invoke(disruptor.clj:77)
>     at 
> org.apache.storm.daemon.executor$mk_executor$reify__4923.shutdown(executor.clj:412)
>     at sun.reflect.GeneratedMethodAccessor303.invoke(Unknown Source)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
>     at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313)
>     at 
> org.apache.storm.daemon.worker$fn__5550$exec_fn__1372__auto__$reify__5552$shutdown_STAR___5572.invoke(worker.clj:668)
>     at 
> org.apache.storm.daemon.worker$fn__5550$exec_fn__1372__auto__$reify$reify__5598.shutdown(worker.clj:706)
>     at org.apache.storm.ProcessSimulator.killProcess(ProcessSimulator.java:66)
>     at 
> org.apache.storm.ProcessSimulator.killAllProcesses(ProcessSimulator.java:79)
>     at 
> org.apache.storm.testing$kill_local_storm_cluster.invoke(testing.clj:207)
>     at org.apache.storm.testing4j$_withLocalCluster.invoke(testing4j.clj:93)
>     at org.apache.storm.Testing.withLocalCluster(Unknown Source)
> {code}
> and
> {code}
> java.lang.IllegalStateException: Timer is not active
>     at org.apache.storm.timer$check_active_BANG_.invoke(timer.clj:87)
>     at org.apache.storm.timer$cancel_timer.invoke(timer.clj:120)
>     at 
> org.apache.storm.daemon.worker$fn__5550$exec_fn__1372__auto__$reify__5552$shutdown_STAR___5572.invoke(worker.clj:682)
>     at 
> org.apache.storm.daemon.worker$fn__5550$exec_fn__1372__auto__$reify$reify__5598.shutdown(worker.clj:706)
>     at org.apache.storm.ProcessSimulator.killProcess(ProcessSimulator.java:66)
>     at 
> org.apache.storm.ProcessSimulator.killAllProcesses(ProcessSimulator.java:79)
>     at 
> org.apache.storm.testing$kill_local_storm_cluster.invoke(testing.clj:207)
>     at org.apache.storm.testing4j$_withLocalCluster.invoke(testing4j.clj:93)
>     at org.apache.storm.Testing.withLocalCluster(Unknown Source)
> {code}
> [~Srdo] is still working on getting a reproducible use case for us. But I 
> will try to reproduce/fix it myself in the mean time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to