This is one of the places where a lease could help. An extension of the existing JERI details could add a lease into the dispatcher layer so that a constant “I am here” message would come through to the service. If the client thread is interrupted it would no longer be pinging/notifying of it’s interest in the results. That would allow the service end, to take appropriate actions. I think that I’d want the export operation or exporter creation, to include the setup of a call back that would occur when an client wants something to stop. I would make the API include a “correlation-ID”, and I’d have that passed into the call to do work, and passed into the call back for cancellation.
Gregg On Dec 20, 2013, at 7:12 AM, Bryan Thompson <br...@systap.com> wrote: > Hello, I put together a test to examine what would occur if a local service > submits an RMI (via jeri) to a remote service. This unit test setups up a > remote service (in a child JVM) and then issues an RMI that invokes a {@link > Thread#sleep(long)} method on the remote service. The thread that issues the > RMI is then interrupted during the sleep. The exception when the local thread > is interrupted is provided below. > > > With reference to the SleepTask below, what I see is on the remote service is: > > > WARN : 08:03:48,137 2861 > com.bigdata.journal.jini.ha.HAJournalTest.executorService1 > com.bigdata.journal.jini.ha.TestHAJournalServer$SleepTask.call(TestHAJournalServer.java:498): > Will sleep: millis=6000 > > … > WARN : 08:03:54,138 8862 > com.bigdata.journal.jini.ha.HAJournalTest.executorService1 > com.bigdata.journal.jini.ha.TestHAJournalServer$SleepTask.call(TestHAJournalServer.java:501): > Sleep finished normally. > WARN : 08:03:54,139 8863 > com.bigdata.journal.jini.ha.HAJournalTest.executorService1 > com.bigdata.journal.jini.ha.TestHAJournalServer$SleepTask.call(TestHAJournalServer.java:508): > Did sleep: millis=6000 > > > Thus, it appears that the interrupt of the local thread making the RMI does > NOT interrupt the thread that is executing the behavior on the remote service > (in this case, Thread.sleep()). > > > A snip of this test is below – it depends on our test harness environment, > but I could isolate it to a river only test if desired (this would be easier > if I had a pointer to a pattern for starting and killing the child process > that I could use for a river test). But first I wanted to see what people's > expectations were for the remote service when the RMI is interrupted. Should > there be an attempt to propagate the interrupt to the method on the remote > service? > > > Thanks, > > Bryan > > > java.io.IOException: request I/O interrupted > > java.rmi.UnmarshalException: exception unmarshalling response; nested > exception is: > > java.io.IOException: request I/O interrupted > > at > net.jini.jeri.BasicInvocationHandler.invokeRemoteMethodOnce(BasicInvocationHandler.java:847) > > at > net.jini.jeri.BasicInvocationHandler.invokeRemoteMethod(BasicInvocationHandler.java:659) > > at > net.jini.jeri.BasicInvocationHandler.invoke(BasicInvocationHandler.java:528) > > at $Proxy2.submit(Unknown Source) > > at > com.bigdata.journal.jini.ha.TestHAJournalServer$1.call(TestHAJournalServer.java:399) > > at > com.bigdata.journal.jini.ha.TestHAJournalServer$1.call(TestHAJournalServer.java:1) > > at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) > > at java.util.concurrent.FutureTask.run(FutureTask.java:166) > > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > > at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) > > at java.util.concurrent.FutureTask.run(FutureTask.java:166) > > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) > > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > > at java.lang.Thread.run(Thread.java:722) > > Caused by: java.io.IOException: request I/O interrupted > > at > com.sun.jini.jeri.internal.mux.Session$MuxInputStream.read(Session.java:833) > > at > net.jini.jeri.connection.ConnectionManager$Outbound$Input.read(ConnectionManager.java:550) > > at net.jini.jeri.BasicObjectEndpoint.executeCall(BasicObjectEndpoint.java:410) > > at > net.jini.jeri.BasicInvocationHandler.invokeRemoteMethodOnce(BasicInvocationHandler.java:806) > > ... 15 more > > Caused by: java.lang.InterruptedException > > at java.lang.Object.wait(Native Method) > > at java.lang.Object.wait(Object.java:503) > > at > com.sun.jini.jeri.internal.mux.Session$MuxInputStream.read(Session.java:829) > > ... 18 more > > > /** > > * This test is used to characterize what happens when we interrupt an RMI. > > * Most methods on the {@link HAGlue} interface are synchronous - they > block > > * while some behavior is executed. This is even true for some methods that > > * return a {@link Future} in order to avoid overhead associated with the > > * export of a proxy and DGC thread leaks (since fixed in River). > > * <p> > > * This unit test setups up a service and then issues an RMI that invokes a > > * {@link Thread#sleep(long)} method on the service. The thread that issues > > * the RMI is then interrupted during the sleep. > > * > > * @throws Exception > > */ > > public void test_interruptRMI() throws Exception { > > > // Start a service. > > final HAGlue serverA = startA(); > > > final AtomicReference<Throwable> localCause = new > AtomicReference<Throwable>(); > > > final ExecutorService executorService = Executors > > .newSingleThreadScheduledExecutor(DaemonThreadFactory > > .defaultThreadFactory()); > > > > try { > > > final FutureTask<Void> localFuture = new FutureTask<Void>( > > new Callable<Void>() { > > > @Override > > public Void call() throws Exception { > > > try { > > final Future<Void> ft = ((HAGlueTest) serverA) > > .submit(new SleepTask(6000/* ms */), > > false/* asyncFuture */); > > > return ft.get(); > > } catch (Throwable t) { > > localCause.set(t); > > log.error(t, t); > > throw new RuntimeException(t); > > } finally { > > log.warn("Local submit of remote task is > done."); > > } > > } > > }); > > /* > > * Submit task that will execute sleep on A. This task will block > > * until A finishes its sleep. When we cancel this task, the RMI to > > * A will be interrupted. > > */ > > executorService.execute(localFuture); > > > > // Wait a bit to ensure that the task was started on A. > > Thread.sleep(2000/* ms */); > > > // interrupt the local future. will cause interrupt of the RMI. > > localFuture.cancel(true/*mayInterruptIfRunning*/); > > > } finally { > > > > executorService.shutdownNow(); > > > > } > > > /* > > * The local root cause of the RMI failure is an InterruptedException. > > * > > * Note: There is a data race between when the [localCause] is set and > > * when we exit the code block above. This is because we are > > * interrupting the local task and have no means to await the > completion > > * of its error handling routine which sets the [localCause]. > > */ > > { > > assertCondition(new Runnable() { > > @Override > > public void run() { > > final Throwable tmp = localCause.get(); > > assertNotNull(tmp); > > assertTrue(InnerCause.isInnerCause(tmp, > > InterruptedException.class)); > > } > > }, 10000/*timeout*/, TimeUnit.MILLISECONDS); > > } > > > /* > > * Verify the root cause as observed by A for the interrupt. It should > > * also be an InterruptedException. > > * > > * Note: Again, there is a data race. > > * > > * Note: Because we might retry this, we do NOT use the > getAndClearXXX() > > * method to recover the remote exception. > > */ > > { > > assertCondition(new Runnable() { > > @Override > > public void run() { > > Throwable tmp; > > try { > > tmp = ((HAGlueTest) serverA).getLastRootCause(); > > } catch (IOException e) { > > throw new RuntimeException(e); > > } > > assertNotNull(tmp); > > log.warn("Received non-null lastRootCause=" + tmp, tmp); > > assertTrue(InnerCause.isInnerCause(tmp, > > InterruptedException.class)); > > } > > }, 10000/* timeout */, TimeUnit.MILLISECONDS); > > } > > > > } > > > /** > > * Task sleeps for a specified duration. > > * > > * @author <a href="mailto:thompson...@users.sourceforge.net">Bryan > > * Thompson</a> > > */ > > private static class SleepTask extends IndexManagerCallable<Void> { > > > private static final long serialVersionUID = 1L; > > > private long millis; > > > SleepTask(final long millis) { > > this.millis = millis; > > } > > > @Override > > public Void call() throws Exception { > > log.warn("Will sleep: millis=" + millis); > > try { > > Thread.sleep(millis); > > log.warn("Sleep finished normally."); > > } catch (Throwable t) { > > log.error("Exception during sleep: "+t, t); > > ((HAJournalTest) getIndexManager()).getRemoteImpl() > > .setLastRootCause(t); > > throw new RuntimeException(t); > > } finally { > > log.warn("Did sleep: millis=" + millis); > > } > > return null; > > } > > > } >