Hello, I put together a test to examine what would occur if a local service 
submits an RMI (via jeri) to a remote service. This unit test setups up a 
remote service (in a child JVM) and then issues an RMI that invokes a {@link 
Thread#sleep(long)} method on the remote service. The thread that issues the 
RMI is then interrupted during the sleep. The exception when the local thread 
is interrupted is provided below.


With reference to the SleepTask below, what I see is on the remote service is:


WARN : 08:03:48,137 2861      
com.bigdata.journal.jini.ha.HAJournalTest.executorService1 
com.bigdata.journal.jini.ha.TestHAJournalServer$SleepTask.call(TestHAJournalServer.java:498):
 Will sleep: millis=6000

…
WARN : 08:03:54,138 8862      
com.bigdata.journal.jini.ha.HAJournalTest.executorService1 
com.bigdata.journal.jini.ha.TestHAJournalServer$SleepTask.call(TestHAJournalServer.java:501):
 Sleep finished normally.
WARN : 08:03:54,139 8863      
com.bigdata.journal.jini.ha.HAJournalTest.executorService1 
com.bigdata.journal.jini.ha.TestHAJournalServer$SleepTask.call(TestHAJournalServer.java:508):
 Did sleep: millis=6000


Thus, it appears that the interrupt of the local thread making the RMI does NOT 
interrupt the thread that is executing the behavior on the remote service (in 
this case, Thread.sleep()).


A snip of this test is below – it depends on our test harness environment, but 
I could isolate it to a river only test if desired (this would be easier if I 
had a pointer to a pattern for starting and killing the child process that I 
could use for a river test).  But first I wanted to see what people's 
expectations were for the remote service when the RMI is interrupted. Should 
there be an attempt to propagate the interrupt to the method on the remote 
service?


Thanks,

Bryan


java.io.IOException: request I/O interrupted

java.rmi.UnmarshalException: exception unmarshalling response; nested exception 
is:

java.io.IOException: request I/O interrupted

at 
net.jini.jeri.BasicInvocationHandler.invokeRemoteMethodOnce(BasicInvocationHandler.java:847)

at 
net.jini.jeri.BasicInvocationHandler.invokeRemoteMethod(BasicInvocationHandler.java:659)

at net.jini.jeri.BasicInvocationHandler.invoke(BasicInvocationHandler.java:528)

at $Proxy2.submit(Unknown Source)

at 
com.bigdata.journal.jini.ha.TestHAJournalServer$1.call(TestHAJournalServer.java:399)

at 
com.bigdata.journal.jini.ha.TestHAJournalServer$1.call(TestHAJournalServer.java:1)

at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)

at java.util.concurrent.FutureTask.run(FutureTask.java:166)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)

at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)

at java.util.concurrent.FutureTask.run(FutureTask.java:166)

at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)

at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)

at java.lang.Thread.run(Thread.java:722)

Caused by: java.io.IOException: request I/O interrupted

at com.sun.jini.jeri.internal.mux.Session$MuxInputStream.read(Session.java:833)

at 
net.jini.jeri.connection.ConnectionManager$Outbound$Input.read(ConnectionManager.java:550)

at net.jini.jeri.BasicObjectEndpoint.executeCall(BasicObjectEndpoint.java:410)

at 
net.jini.jeri.BasicInvocationHandler.invokeRemoteMethodOnce(BasicInvocationHandler.java:806)

... 15 more

Caused by: java.lang.InterruptedException

at java.lang.Object.wait(Native Method)

at java.lang.Object.wait(Object.java:503)

at com.sun.jini.jeri.internal.mux.Session$MuxInputStream.read(Session.java:829)

... 18 more


    /**

     * This test is used to characterize what happens when we interrupt an RMI.

     * Most methods on the {@link HAGlue} interface are synchronous - they block

     * while some behavior is executed. This is even true for some methods that

     * return a {@link Future} in order to avoid overhead associated with the

     * export of a proxy and DGC thread leaks (since fixed in River).

     * <p>

     * This unit test setups up a service and then issues an RMI that invokes a

     * {@link Thread#sleep(long)} method on the service. The thread that issues

     * the RMI is then interrupted during the sleep.

     *

     * @throws Exception

     */

    public void test_interruptRMI() throws Exception {


        // Start a service.

        final HAGlue serverA = startA();


        final AtomicReference<Throwable> localCause = new 
AtomicReference<Throwable>();


        final ExecutorService executorService = Executors

                .newSingleThreadScheduledExecutor(DaemonThreadFactory

                        .defaultThreadFactory());



        try {


            final FutureTask<Void> localFuture = new FutureTask<Void>(

                    new Callable<Void>() {


                        @Override

                        public Void call() throws Exception {


                            try {

                                final Future<Void> ft = ((HAGlueTest) serverA)

                                        .submit(new SleepTask(6000/* ms */),

                                                false/* asyncFuture */);


                                return ft.get();

                            } catch (Throwable t) {

                                localCause.set(t);

                                log.error(t, t);

                                throw new RuntimeException(t);

                            } finally {

                                log.warn("Local submit of remote task is 
done.");

                            }

                        }

                    });

            /*

             * Submit task that will execute sleep on A. This task will block

             * until A finishes its sleep. When we cancel this task, the RMI to

             * A will be interrupted.

             */

            executorService.execute(localFuture);



            // Wait a bit to ensure that the task was started on A.

            Thread.sleep(2000/* ms */);


            // interrupt the local future. will cause interrupt of the RMI.

            localFuture.cancel(true/*mayInterruptIfRunning*/);


        } finally {



            executorService.shutdownNow();



        }


        /*

         * The local root cause of the RMI failure is an InterruptedException.

         *

         * Note: There is a data race between when the [localCause] is set and

         * when we exit the code block above. This is because we are

         * interrupting the local task and have no means to await the completion

         * of its error handling routine which sets the [localCause].

         */

        {

            assertCondition(new Runnable() {

                @Override

                public void run() {

                    final Throwable tmp = localCause.get();

                    assertNotNull(tmp);

                    assertTrue(InnerCause.isInnerCause(tmp,

                            InterruptedException.class));

                }

            }, 10000/*timeout*/, TimeUnit.MILLISECONDS);

        }


        /*

         * Verify the root cause as observed by A for the interrupt. It should

         * also be an InterruptedException.

         *

         * Note: Again, there is a data race.

         *

         * Note: Because we might retry this, we do NOT use the getAndClearXXX()

         * method to recover the remote exception.

         */

        {

            assertCondition(new Runnable() {

                @Override

                public void run() {

                    Throwable tmp;

                    try {

                        tmp = ((HAGlueTest) serverA).getLastRootCause();

                    } catch (IOException e) {

                        throw new RuntimeException(e);

                    }

                    assertNotNull(tmp);

                    log.warn("Received non-null lastRootCause=" + tmp, tmp);

                    assertTrue(InnerCause.isInnerCause(tmp,

                            InterruptedException.class));

                }

            }, 10000/* timeout */, TimeUnit.MILLISECONDS);

        }



    }


    /**

     * Task sleeps for a specified duration.

     *

     * @author <a href="mailto:thompson...@users.sourceforge.net";>Bryan

     *         Thompson</a>

     */

    private static class SleepTask extends IndexManagerCallable<Void> {


        private static final long serialVersionUID = 1L;


        private long millis;


        SleepTask(final long millis) {

            this.millis = millis;

        }


        @Override

        public Void call() throws Exception {

            log.warn("Will sleep: millis=" + millis);

            try {

                Thread.sleep(millis);

                log.warn("Sleep finished normally.");

            } catch (Throwable t) {

                log.error("Exception during sleep: "+t, t);

                ((HAJournalTest) getIndexManager()).getRemoteImpl()

                        .setLastRootCause(t);

                throw new RuntimeException(t);

            } finally {

                log.warn("Did sleep: millis=" + millis);

            }

            return null;

        }


    }

Reply via email to