This is one of the places where a lease could help.  An extension of the 
existing JERI details could add a lease into the dispatcher layer so that a 
constant “I am here” message would come through to the service.  If the client 
thread is interrupted it would no longer be pinging/notifying of it’s interest 
in the results.  That would allow the service end, to take appropriate actions. 
 I think that I’d want the export operation or exporter creation, to include 
the setup of a call back that would occur when an client wants something to 
stop.  I would make the API include a “correlation-ID”, and I’d have that 
passed into the call to do work, and passed into the call back for cancellation.

Gregg

On Dec 20, 2013, at 7:12 AM, Bryan Thompson <br...@systap.com> wrote:

> Hello, I put together a test to examine what would occur if a local service 
> submits an RMI (via jeri) to a remote service. This unit test setups up a 
> remote service (in a child JVM) and then issues an RMI that invokes a {@link 
> Thread#sleep(long)} method on the remote service. The thread that issues the 
> RMI is then interrupted during the sleep. The exception when the local thread 
> is interrupted is provided below.
> 
> 
> With reference to the SleepTask below, what I see is on the remote service is:
> 
> 
> WARN : 08:03:48,137 2861      
> com.bigdata.journal.jini.ha.HAJournalTest.executorService1 
> com.bigdata.journal.jini.ha.TestHAJournalServer$SleepTask.call(TestHAJournalServer.java:498):
>  Will sleep: millis=6000
> 
> …
> WARN : 08:03:54,138 8862      
> com.bigdata.journal.jini.ha.HAJournalTest.executorService1 
> com.bigdata.journal.jini.ha.TestHAJournalServer$SleepTask.call(TestHAJournalServer.java:501):
>  Sleep finished normally.
> WARN : 08:03:54,139 8863      
> com.bigdata.journal.jini.ha.HAJournalTest.executorService1 
> com.bigdata.journal.jini.ha.TestHAJournalServer$SleepTask.call(TestHAJournalServer.java:508):
>  Did sleep: millis=6000
> 
> 
> Thus, it appears that the interrupt of the local thread making the RMI does 
> NOT interrupt the thread that is executing the behavior on the remote service 
> (in this case, Thread.sleep()).
> 
> 
> A snip of this test is below – it depends on our test harness environment, 
> but I could isolate it to a river only test if desired (this would be easier 
> if I had a pointer to a pattern for starting and killing the child process 
> that I could use for a river test).  But first I wanted to see what people's 
> expectations were for the remote service when the RMI is interrupted. Should 
> there be an attempt to propagate the interrupt to the method on the remote 
> service?
> 
> 
> Thanks,
> 
> Bryan
> 
> 
> java.io.IOException: request I/O interrupted
> 
> java.rmi.UnmarshalException: exception unmarshalling response; nested 
> exception is:
> 
> java.io.IOException: request I/O interrupted
> 
> at 
> net.jini.jeri.BasicInvocationHandler.invokeRemoteMethodOnce(BasicInvocationHandler.java:847)
> 
> at 
> net.jini.jeri.BasicInvocationHandler.invokeRemoteMethod(BasicInvocationHandler.java:659)
> 
> at 
> net.jini.jeri.BasicInvocationHandler.invoke(BasicInvocationHandler.java:528)
> 
> at $Proxy2.submit(Unknown Source)
> 
> at 
> com.bigdata.journal.jini.ha.TestHAJournalServer$1.call(TestHAJournalServer.java:399)
> 
> at 
> com.bigdata.journal.jini.ha.TestHAJournalServer$1.call(TestHAJournalServer.java:1)
> 
> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
> 
> at java.util.concurrent.FutureTask.run(FutureTask.java:166)
> 
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> 
> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
> 
> at java.util.concurrent.FutureTask.run(FutureTask.java:166)
> 
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
> 
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
> 
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> 
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> 
> at java.lang.Thread.run(Thread.java:722)
> 
> Caused by: java.io.IOException: request I/O interrupted
> 
> at 
> com.sun.jini.jeri.internal.mux.Session$MuxInputStream.read(Session.java:833)
> 
> at 
> net.jini.jeri.connection.ConnectionManager$Outbound$Input.read(ConnectionManager.java:550)
> 
> at net.jini.jeri.BasicObjectEndpoint.executeCall(BasicObjectEndpoint.java:410)
> 
> at 
> net.jini.jeri.BasicInvocationHandler.invokeRemoteMethodOnce(BasicInvocationHandler.java:806)
> 
> ... 15 more
> 
> Caused by: java.lang.InterruptedException
> 
> at java.lang.Object.wait(Native Method)
> 
> at java.lang.Object.wait(Object.java:503)
> 
> at 
> com.sun.jini.jeri.internal.mux.Session$MuxInputStream.read(Session.java:829)
> 
> ... 18 more
> 
> 
>    /**
> 
>     * This test is used to characterize what happens when we interrupt an RMI.
> 
>     * Most methods on the {@link HAGlue} interface are synchronous - they 
> block
> 
>     * while some behavior is executed. This is even true for some methods that
> 
>     * return a {@link Future} in order to avoid overhead associated with the
> 
>     * export of a proxy and DGC thread leaks (since fixed in River).
> 
>     * <p>
> 
>     * This unit test setups up a service and then issues an RMI that invokes a
> 
>     * {@link Thread#sleep(long)} method on the service. The thread that issues
> 
>     * the RMI is then interrupted during the sleep.
> 
>     *
> 
>     * @throws Exception
> 
>     */
> 
>    public void test_interruptRMI() throws Exception {
> 
> 
>        // Start a service.
> 
>        final HAGlue serverA = startA();
> 
> 
>        final AtomicReference<Throwable> localCause = new 
> AtomicReference<Throwable>();
> 
> 
>        final ExecutorService executorService = Executors
> 
>                .newSingleThreadScheduledExecutor(DaemonThreadFactory
> 
>                        .defaultThreadFactory());
> 
> 
> 
>        try {
> 
> 
>            final FutureTask<Void> localFuture = new FutureTask<Void>(
> 
>                    new Callable<Void>() {
> 
> 
>                        @Override
> 
>                        public Void call() throws Exception {
> 
> 
>                            try {
> 
>                                final Future<Void> ft = ((HAGlueTest) serverA)
> 
>                                        .submit(new SleepTask(6000/* ms */),
> 
>                                                false/* asyncFuture */);
> 
> 
>                                return ft.get();
> 
>                            } catch (Throwable t) {
> 
>                                localCause.set(t);
> 
>                                log.error(t, t);
> 
>                                throw new RuntimeException(t);
> 
>                            } finally {
> 
>                                log.warn("Local submit of remote task is 
> done.");
> 
>                            }
> 
>                        }
> 
>                    });
> 
>            /*
> 
>             * Submit task that will execute sleep on A. This task will block
> 
>             * until A finishes its sleep. When we cancel this task, the RMI to
> 
>             * A will be interrupted.
> 
>             */
> 
>            executorService.execute(localFuture);
> 
> 
> 
>            // Wait a bit to ensure that the task was started on A.
> 
>            Thread.sleep(2000/* ms */);
> 
> 
>            // interrupt the local future. will cause interrupt of the RMI.
> 
>            localFuture.cancel(true/*mayInterruptIfRunning*/);
> 
> 
>        } finally {
> 
> 
> 
>            executorService.shutdownNow();
> 
> 
> 
>        }
> 
> 
>        /*
> 
>         * The local root cause of the RMI failure is an InterruptedException.
> 
>         *
> 
>         * Note: There is a data race between when the [localCause] is set and
> 
>         * when we exit the code block above. This is because we are
> 
>         * interrupting the local task and have no means to await the 
> completion
> 
>         * of its error handling routine which sets the [localCause].
> 
>         */
> 
>        {
> 
>            assertCondition(new Runnable() {
> 
>                @Override
> 
>                public void run() {
> 
>                    final Throwable tmp = localCause.get();
> 
>                    assertNotNull(tmp);
> 
>                    assertTrue(InnerCause.isInnerCause(tmp,
> 
>                            InterruptedException.class));
> 
>                }
> 
>            }, 10000/*timeout*/, TimeUnit.MILLISECONDS);
> 
>        }
> 
> 
>        /*
> 
>         * Verify the root cause as observed by A for the interrupt. It should
> 
>         * also be an InterruptedException.
> 
>         *
> 
>         * Note: Again, there is a data race.
> 
>         *
> 
>         * Note: Because we might retry this, we do NOT use the 
> getAndClearXXX()
> 
>         * method to recover the remote exception.
> 
>         */
> 
>        {
> 
>            assertCondition(new Runnable() {
> 
>                @Override
> 
>                public void run() {
> 
>                    Throwable tmp;
> 
>                    try {
> 
>                        tmp = ((HAGlueTest) serverA).getLastRootCause();
> 
>                    } catch (IOException e) {
> 
>                        throw new RuntimeException(e);
> 
>                    }
> 
>                    assertNotNull(tmp);
> 
>                    log.warn("Received non-null lastRootCause=" + tmp, tmp);
> 
>                    assertTrue(InnerCause.isInnerCause(tmp,
> 
>                            InterruptedException.class));
> 
>                }
> 
>            }, 10000/* timeout */, TimeUnit.MILLISECONDS);
> 
>        }
> 
> 
> 
>    }
> 
> 
>    /**
> 
>     * Task sleeps for a specified duration.
> 
>     *
> 
>     * @author <a href="mailto:thompson...@users.sourceforge.net";>Bryan
> 
>     *         Thompson</a>
> 
>     */
> 
>    private static class SleepTask extends IndexManagerCallable<Void> {
> 
> 
>        private static final long serialVersionUID = 1L;
> 
> 
>        private long millis;
> 
> 
>        SleepTask(final long millis) {
> 
>            this.millis = millis;
> 
>        }
> 
> 
>        @Override
> 
>        public Void call() throws Exception {
> 
>            log.warn("Will sleep: millis=" + millis);
> 
>            try {
> 
>                Thread.sleep(millis);
> 
>                log.warn("Sleep finished normally.");
> 
>            } catch (Throwable t) {
> 
>                log.error("Exception during sleep: "+t, t);
> 
>                ((HAJournalTest) getIndexManager()).getRemoteImpl()
> 
>                        .setLastRootCause(t);
> 
>                throw new RuntimeException(t);
> 
>            } finally {
> 
>                log.warn("Did sleep: millis=" + millis);
> 
>            }
> 
>            return null;
> 
>        }
> 
> 
>    }
> 

Reply via email to