Hi Bryan:

I would expect the remote task to complete and return, despite the caller being 
interrupted.  The JERI subsystem (I’m guessing the invocation layer, but it 
might be the transport layer) might log an exception when it tried to return, 
but there was nobody on the calling end.

That’s consistent with a worst-case failure, where the caller drops off the 
network.  How is the called service supposed to know what happened to the 

In the case of a typical short operation, I wouldn’t see that as a big issue, 
as the wasted computational effort on the service side won’t be consequential.

In the case of a long-running operation where it becomes more likely that the 
caller wants to stop or cancel an operation (in addition to the possibility of 
having it interrupted), I’d try to break it into a series of operations 
(chunks), or setup an ongoing notification or update protocol.  You probably 
want to do that anyway, because clients probably would like to see interim 
updates while a long operation is in process.

Unfortunately, I don’t think these kinds of things can be reasonably handled in 
a communication layer - the application almost always needs to be involved in 
the resolution of a problem when we have a service-oriented system.



On Dec 20, 2013, at 8:12 AM, Bryan Thompson <br...@systap.com> wrote:

> Hello, I put together a test to examine what would occur if a local service 
> submits an RMI (via jeri) to a remote service. This unit test setups up a 
> remote service (in a child JVM) and then issues an RMI that invokes a {@link 
> Thread#sleep(long)} method on the remote service. The thread that issues the 
> RMI is then interrupted during the sleep. The exception when the local thread 
> is interrupted is provided below.
> With reference to the SleepTask below, what I see is on the remote service is:
> WARN : 08:03:48,137 2861      
> com.bigdata.journal.jini.ha.HAJournalTest.executorService1 
> com.bigdata.journal.jini.ha.TestHAJournalServer$SleepTask.call(TestHAJournalServer.java:498):
>  Will sleep: millis=6000
> …
> WARN : 08:03:54,138 8862      
> com.bigdata.journal.jini.ha.HAJournalTest.executorService1 
> com.bigdata.journal.jini.ha.TestHAJournalServer$SleepTask.call(TestHAJournalServer.java:501):
>  Sleep finished normally.
> WARN : 08:03:54,139 8863      
> com.bigdata.journal.jini.ha.HAJournalTest.executorService1 
> com.bigdata.journal.jini.ha.TestHAJournalServer$SleepTask.call(TestHAJournalServer.java:508):
>  Did sleep: millis=6000
> Thus, it appears that the interrupt of the local thread making the RMI does 
> NOT interrupt the thread that is executing the behavior on the remote service 
> (in this case, Thread.sleep()).
> A snip of this test is below – it depends on our test harness environment, 
> but I could isolate it to a river only test if desired (this would be easier 
> if I had a pointer to a pattern for starting and killing the child process 
> that I could use for a river test).  But first I wanted to see what people's 
> expectations were for the remote service when the RMI is interrupted. Should 
> there be an attempt to propagate the interrupt to the method on the remote 
> service?
> Thanks,
> Bryan
> java.io.IOException: request I/O interrupted
> java.rmi.UnmarshalException: exception unmarshalling response; nested 
> exception is:
> java.io.IOException: request I/O interrupted
> at 
> net.jini.jeri.BasicInvocationHandler.invokeRemoteMethodOnce(BasicInvocationHandler.java:847)
> at 
> net.jini.jeri.BasicInvocationHandler.invokeRemoteMethod(BasicInvocationHandler.java:659)
> at 
> net.jini.jeri.BasicInvocationHandler.invoke(BasicInvocationHandler.java:528)
> at $Proxy2.submit(Unknown Source)
> at 
> com.bigdata.journal.jini.ha.TestHAJournalServer$1.call(TestHAJournalServer.java:399)
> at 
> com.bigdata.journal.jini.ha.TestHAJournalServer$1.call(TestHAJournalServer.java:1)
> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
> at java.util.concurrent.FutureTask.run(FutureTask.java:166)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
> at java.util.concurrent.FutureTask.run(FutureTask.java:166)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> at java.lang.Thread.run(Thread.java:722)
> Caused by: java.io.IOException: request I/O interrupted
> at 
> com.sun.jini.jeri.internal.mux.Session$MuxInputStream.read(Session.java:833)
> at 
> net.jini.jeri.connection.ConnectionManager$Outbound$Input.read(ConnectionManager.java:550)
> at net.jini.jeri.BasicObjectEndpoint.executeCall(BasicObjectEndpoint.java:410)
> at 
> net.jini.jeri.BasicInvocationHandler.invokeRemoteMethodOnce(BasicInvocationHandler.java:806)
> ... 15 more
> Caused by: java.lang.InterruptedException
> at java.lang.Object.wait(Native Method)
> at java.lang.Object.wait(Object.java:503)
> at 
> com.sun.jini.jeri.internal.mux.Session$MuxInputStream.read(Session.java:829)
> ... 18 more
>    /**
>     * This test is used to characterize what happens when we interrupt an RMI.
>     * Most methods on the {@link HAGlue} interface are synchronous - they 
> block
>     * while some behavior is executed. This is even true for some methods that
>     * return a {@link Future} in order to avoid overhead associated with the
>     * export of a proxy and DGC thread leaks (since fixed in River).
>     * <p>
>     * This unit test setups up a service and then issues an RMI that invokes a
>     * {@link Thread#sleep(long)} method on the service. The thread that issues
>     * the RMI is then interrupted during the sleep.
>     *
>     * @throws Exception
>     */
>    public void test_interruptRMI() throws Exception {
>        // Start a service.
>        final HAGlue serverA = startA();
>        final AtomicReference<Throwable> localCause = new 
> AtomicReference<Throwable>();
>        final ExecutorService executorService = Executors
>                .newSingleThreadScheduledExecutor(DaemonThreadFactory
>                        .defaultThreadFactory());
>        try {
>            final FutureTask<Void> localFuture = new FutureTask<Void>(
>                    new Callable<Void>() {
>                        @Override
>                        public Void call() throws Exception {
>                            try {
>                                final Future<Void> ft = ((HAGlueTest) serverA)
>                                        .submit(new SleepTask(6000/* ms */),
>                                                false/* asyncFuture */);
>                                return ft.get();
>                            } catch (Throwable t) {
>                                localCause.set(t);
>                                log.error(t, t);
>                                throw new RuntimeException(t);
>                            } finally {
>                                log.warn("Local submit of remote task is 
> done.");
>                            }
>                        }
>                    });
>            /*
>             * Submit task that will execute sleep on A. This task will block
>             * until A finishes its sleep. When we cancel this task, the RMI to
>             * A will be interrupted.
>             */
>            executorService.execute(localFuture);
>            // Wait a bit to ensure that the task was started on A.
>            Thread.sleep(2000/* ms */);
>            // interrupt the local future. will cause interrupt of the RMI.
>            localFuture.cancel(true/*mayInterruptIfRunning*/);
>        } finally {
>            executorService.shutdownNow();
>        }
>        /*
>         * The local root cause of the RMI failure is an InterruptedException.
>         *
>         * Note: There is a data race between when the [localCause] is set and
>         * when we exit the code block above. This is because we are
>         * interrupting the local task and have no means to await the 
> completion
>         * of its error handling routine which sets the [localCause].
>         */
>        {
>            assertCondition(new Runnable() {
>                @Override
>                public void run() {
>                    final Throwable tmp = localCause.get();
>                    assertNotNull(tmp);
>                    assertTrue(InnerCause.isInnerCause(tmp,
>                            InterruptedException.class));
>                }
>            }, 10000/*timeout*/, TimeUnit.MILLISECONDS);
>        }
>        /*
>         * Verify the root cause as observed by A for the interrupt. It should
>         * also be an InterruptedException.
>         *
>         * Note: Again, there is a data race.
>         *
>         * Note: Because we might retry this, we do NOT use the 
> getAndClearXXX()
>         * method to recover the remote exception.
>         */
>        {
>            assertCondition(new Runnable() {
>                @Override
>                public void run() {
>                    Throwable tmp;
>                    try {
>                        tmp = ((HAGlueTest) serverA).getLastRootCause();
>                    } catch (IOException e) {
>                        throw new RuntimeException(e);
>                    }
>                    assertNotNull(tmp);
>                    log.warn("Received non-null lastRootCause=" + tmp, tmp);
>                    assertTrue(InnerCause.isInnerCause(tmp,
>                            InterruptedException.class));
>                }
>            }, 10000/* timeout */, TimeUnit.MILLISECONDS);
>        }
>    }
>    /**
>     * Task sleeps for a specified duration.
>     *
>     * @author <a href="mailto:thompson...@users.sourceforge.net";>Bryan
>     *         Thompson</a>
>     */
>    private static class SleepTask extends IndexManagerCallable<Void> {
>        private static final long serialVersionUID = 1L;
>        private long millis;
>        SleepTask(final long millis) {
>            this.millis = millis;
>        }
>        @Override
>        public Void call() throws Exception {
>            log.warn("Will sleep: millis=" + millis);
>            try {
>                Thread.sleep(millis);
>                log.warn("Sleep finished normally.");
>            } catch (Throwable t) {
>                log.error("Exception during sleep: "+t, t);
>                ((HAJournalTest) getIndexManager()).getRemoteImpl()
>                        .setLastRootCause(t);
>                throw new RuntimeException(t);
>            } finally {
>                log.warn("Did sleep: millis=" + millis);
>            }
>            return null;
>        }
>    }

Reply via email to