Torsten Curdt wrote:
If I understand this correctly, for me to capture the continuation and
run it later, I do:

Have a look into the serialization testcase

Thanks. I did some experiments. I think I have misunderstood the way the stack restoration works. It seems like it's actually replacing local variable #0 (the 'this' pointer.)

This is interesting. I didn't know that you can do this in Java (I guess you can do it in JVM, just not in Java language.)





(I'm assuming that you designed ContinuationContext to
keep track of things you want to change between runs, so it's not reachable from Continuation if it's not executing.)

Sorry, did not get that ..what do you mean?

The ContinuationContext holds all the references
that are required to restore the state but cannot
really be part of the continuation. Logger,
ComponentManager and things like that.

Yes, I think we are saying the same thing. Pardon my poor English.


See the example in my blog ...which is actually
taken from the Cocoon integration. (I think in
there is also the link to the class) Would be
great if you could also have a look into that
class.

I looked at Cocoon code you had in [1]. I didn't particularly see anything that made me rethink (IOW, I don't see anything that absolutely requires MethodLookup nor ContinuationContext instead of Runnable.)

But at the same time, I saw that you have some existing investments with the current javaflow Continuation API and it's very understandable that you don't want it to change.

I see the desire to pass some contextual information to the invoked method, but that is a fairly common requirement even if you aren't using javaflow at all. javaflow just keeps it and stores it somewhere for later retrieval, which can be better done by the code that knows what the scope of a given context object is (like ThreadLocal, singleton, HttpSession, etc.)


Just to make sure that I'm not missing something, I rewrote the JavaInterpreter and CocoonContinuationContext by using the ContinuationThread class (which I renamed to Fiber) instead of the current Continuation API. The only difference is that you now do:

CocoonContinuationContext.getCurrent()

instead of:

(CocoonContinuationContext)Continuation.currentContinuation().getContext()

See the attachments for the exact code. All in all, I didn't think this re-write is ugly, but that is always a subjective issue.


Anyway, I'm no longer too keen about convincing you to change the API of the Continuation class. If you like my rewrite, that's good, but if not, that's also fine with me. The current Continuation API makes the Fiber class implementation uglier, but that's not too big a problem.

Instead, I'm hoping that you allow me to commit this Fiber class to javaflow, as this is what I primarily want to use in my code.



[1] http://svn.apache.org/repos/asf/cocoon/blocks/javaflow/trunk/
--
Kohsuke Kawaguchi
Sun Microsystems                   [EMAIL PROTECTED]
/*
 * Copyright 1999-2004 The Apache Software Foundation.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cocoon.components.flow.java;

import org.apache.avalon.framework.context.Context;
import org.apache.avalon.framework.logger.Logger;
import org.apache.avalon.framework.parameters.Parameters;
import org.apache.avalon.framework.service.ServiceManager;
import org.apache.cocoon.environment.Redirector;
import org.apache.commons.javaflow.ContinuationContext;
import org.apache.commons.javaflow.Fiber;

/**
 * Helper class to associate cocoon flow informations to the continuation.
 *
 * @author <a href="mailto:[EMAIL PROTECTED]">Torsten Curdt</a>
 * @author <a href="mailto:[EMAIL PROTECTED]">Stephan Michels</a>
 * @version CVS $Id: CocoonContinuationContext.java 151736 2005-02-07 18:29:10Z tcurdt $
 */
public class CocoonContinuationContext {

    private static final ThreadLocal context = new ThreadLocal();

    public static CocoonContinuationContext getCurrent() {
        return (CocoonContinuationContext)context.get();
    }

    private Logger logger;
    private Context avalonContext;
    private ServiceManager manager;
    private Redirector redirector;
    
    private Parameters parameters;

    public void setAvalonContext(Context avalonContext) {
        this.avalonContext = avalonContext;
    }

    public Context getAvalonContext() {
        return avalonContext;
    }

    public void setLogger(Logger logger) {
        this.logger = logger;
    }

    public Logger getLogger() {
        return logger;
    }
    
    public void setServiceManager(ServiceManager manager) {
        this.manager = manager;
    }

    public ServiceManager getServiceManager() {
        return manager;
    }

    public void setRedirector(Redirector redirector) {
        this.redirector = redirector;
    }
 
    public Redirector getRedirector() {
        return redirector;
    }
    
	public Parameters getParameters() {
		return parameters;
	}
	
	public void setParameters(Parameters parameters) {
		this.parameters = parameters;
	}


    /*package*/ void register() {
        context.set(this);
    }

    /*package*/ void unregister() {
        context.set(null);
    }
}
/*
 * Copyright 1999-2004 The Apache Software Foundation.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cocoon.components.flow.java;

import java.lang.reflect.Method;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.avalon.framework.parameters.Parameters;
import org.apache.cocoon.ProcessingException;
import org.apache.cocoon.components.ContextHelper;
import org.apache.cocoon.components.flow.AbstractInterpreter;
import org.apache.cocoon.components.flow.FlowHelper;
import org.apache.cocoon.components.flow.InvalidContinuationException;
import org.apache.cocoon.components.flow.WebContinuation;
import org.apache.cocoon.environment.Redirector;
import org.apache.commons.javaflow.ContinuationCapable;
import org.apache.commons.javaflow.Fiber;
import org.apache.commons.javaflow.utils.ReflectionUtils;
import org.apache.commons.jxpath.JXPathIntrospector;

/**
 * Implementation of the java flow interpreter.
 *
 * @author <a href="mailto:[EMAIL PROTECTED]">Stephan Michels</a>
 * @author <a href="mailto:[EMAIL PROTECTED]">Torsten Curdt</a>
 * @version CVS $Id: JavaInterpreter.java 151736 2005-02-07 18:29:10Z tcurdt $
 */
public final class JavaInterpreter extends AbstractInterpreter {

    private int timeToLive = 600000;

    static {
        JXPathIntrospector.registerDynamicClass(VarMap.class, VarMapHandler.class);
    }


    private Map methods = new HashMap();

    /**
     * Used to invoke the method specified.
     *
     * This class is continuation capable without instrumentation.
     */
    private static class Invoker implements Runnable, ContinuationCapable {
        private final Method method;
        private final Object instance;

        public Invoker(Method method) throws IllegalAccessException, InstantiationException {
            this.method = method;
            this.instance = method.getDeclaringClass().newInstance();
        }

        public void run() {
            try {
                method.invoke(instance,new Object[0]);
            } catch (IllegalAccessException e) {
                // TODO: log the error
            } catch (InvocationTargetException e) {
                // TODO: log the error
            }
        }
    }

    private CocoonContinuationContext createContinuationContext( final List params, final Redirector redirector ) {
        final CocoonContinuationContext context = new CocoonContinuationContext();
        
        context.setAvalonContext(avalonContext);
        context.setLogger(getLogger());
        context.setServiceManager(manager);
        context.setRedirector(redirector);

        final Parameters parameters = new Parameters();
        for(final Iterator i = params.iterator(); i.hasNext();) {
            final Argument argument = (Argument)i.next();
            parameters.setParameter(argument.name, argument.value);
        }
        context.setParameters(parameters);
        
        return context;
    }
    
    
    private void updateMethodIndex() throws ClassNotFoundException {
        final Map methods = new HashMap();

        for (final Iterator it = needResolve.iterator(); it.hasNext();) {
            final String clazzName = (String) it.next();

            if (getLogger().isDebugEnabled()) { 
                getLogger().debug("loading " + clazzName);
            }        

            final Class clazz = Thread.currentThread().getContextClassLoader().loadClass(clazzName);
            
            final Map m = ReflectionUtils.discoverMethods(
                    clazz,
                    new ReflectionUtils.DefaultMatcher(),
                    new ReflectionUtils.Indexer() {
                        public void put(final Map pMap, final String pKey, final Object pObject) {
                            final Method method = (Method) pObject;
                            
                            final String fullName = method.getDeclaringClass().getName() + "." + method.getName();
                            final String shortName = method.getName();
                            
                            pMap.put(shortName, method);
                            pMap.put(fullName, method);

                            if (getLogger().isDebugEnabled()) { 
                                getLogger().debug("registered method " + shortName + ", " + fullName); 
                            }        
                        }
                    }
                    );

            for (Iterator i = m.entrySet().iterator(); i.hasNext(); ) {
                final Map.Entry e = (Map.Entry) i.next();
                
                if (getLogger().isWarnEnabled()) { 
                    if (methods.containsKey(e.getKey())) {
                            getLogger().warn("method name clash for " + e.getKey()); 
                        
                    }
                }        
                
                methods.put(e.getKey(), e.getValue());
            }        

        }
        
        // REVISIT: synchronize?
        this.methods = methods;
    }
    
    public void callFunction( final String methodName, final List params, final Redirector redirector ) throws Exception {

        // REVISIT: subscribe to jci events and only update accordingly
        updateMethodIndex();

        Method m = (Method)methods.get(methodName);
        if (m == null) {
            throw new ProcessingException("no method '" + methodName + "' found in " + methods);
        }

        final CocoonContinuationContext context = createContinuationContext(params, redirector);

        Fiber f = new Fiber(new Invoker(m));

        final WebContinuation wk = continuationsMgr.createWebContinuation(
                f, null, timeToLive, getInterpreterID(), null);

        FlowHelper.setWebContinuation(
                ContextHelper.getObjectModel(avalonContext), wk);

        context.register();
        try {
            f.start();
        } finally {
            context.unregister();
        }
    }

    public void handleContinuation( final String id, final List params, final Redirector redirector ) throws Exception {

        final WebContinuation oldWebContinuation = continuationsMgr.lookupWebContinuation(
                id, getInterpreterID());

        if (oldWebContinuation == null) {
            throw new InvalidContinuationException("invalid continuation id " + id);
        }

        final CocoonContinuationContext context = createContinuationContext(params, redirector);

        Fiber f = (Fiber) oldWebContinuation.getContinuation();
        f = f.fork();   // create a new copy

        final WebContinuation newWebContinuation = continuationsMgr.createWebContinuation(
                f, oldWebContinuation, timeToLive, getInterpreterID(), null);

        FlowHelper.setWebContinuation(
                ContextHelper.getObjectModel(avalonContext), newWebContinuation);

        context.register();
        try {
            f._continue();
        } finally {
            context.unregister();
        }
    }
}
/*
 * Copyright 1999-2004 The Apache Software Foundation.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.commons.javaflow;

import java.lang.reflect.Method;
import java.io.Serializable;

/**
 * A virtual "thread".
 *
 * <p>
 * A fiber to a thread is like a thread to a CPU.
 *
 * <p>
 * Fibers can emulate multiple independent execution flows on a single
 * (or possibly multiple) thread(s), just like threads emulate multple
 * independent execution flows on a single (or possibly multiple) CPU(s).
 *
 * <p>
 * The biggest difference between a fiber and a thread is in
 * its pre-emptiveness. Whereas a thread loses a CPU involuntarily (or
 * even without noticing), a fiber loses a thread only when it
 * [EMAIL PROTECTED] #yield() yields}.
 *
 * <p>
 * Another difference is that a fiber is fully implemented at Java level.
 * This allows you to stop a fiber and resume it later, execute it
 * on another thread, or even move it to another JVM, by using
 * serialization.
 *
 *
 *
 *
 *
 *
 * <h2>Usage</h2>
 * <p>
 * A fiber goes through the state transition much similar to that of [EMAIL PROTECTED] Thread}.
 * First, a [EMAIL PROTECTED] Fiber} gets created:
 *
 * <pre>
 * Fiber f = new Fiber(myRunnable);
 * </pre>
 *
 * <p>
 * Then it is started:
 *
 * <pre>
 * f.start();
 * </pre>
 *
 * <p>
 * At this point, a fiber is said to be [EMAIL PROTECTED] #isAlive() alive}.
 * The [EMAIL PROTECTED] Runnable#run()} method of <tt>myRunnable</tt> starts
 * executing by using the same thread. This execution continues until the fiber
 * yields or exits from the [EMAIL PROTECTED] Runnable#run()} method. When either of those
 * conditions are met, the thread used to run a fiber returns from the <tt>start</tt> method.
 *
 * <p>
 * The caller of the <tt>start</tt> method can then go on to do
 * some other things. Later, a fiber can be continued as follows:
 *
 * <pre>
 * f._continue();
 * </pre>
 *
 * <p>
 * At this point, a fiber resumes its execution from where it left off
 * before, again by using the same thread. Just like with
 * the <tt>start</tt> method, this execution continues until
 * the fiber yields or exits, and then the thread used to run a fiber
 * returns from the <tt>_continue</tt> method.
 *
 * <p>
 * Typically, the <tt>_continue</tt> method needs to be called repeatedly
 * for a fiber to complete its execution. Therefore, the simplest
 * execution loop would look like this:
 *
 * <pre>
 * while(f.isAlive())
 *     f._continue();
 * </pre>
 *
 *
 *
 *
 * <h2>Fiber Migration</h2>
 * <p>
 * A fiber is not tied to any particular thread. Therefore, it can be
 * started on one thread, then continued on another thread, and then
 * continued on yet another thread, and so on.
 *
 * This migration can happen while a fiber is not [EMAIL PROTECTED] #isExecuting() executing}.
 *
 * <p>
 * If all objects in a fiber's stack frames are [EMAIL PROTECTED] Serializable},
 * then a fiber can be serialized. This allows a fiber to be migrated
 * to another JVM. Note that for this to work correctly, all methods on
 * a fiber's stack frames must be exactly the same between two JVMs,
 * or else unpredictable behaviors will occur.
 *
 *
 * @author Kohsuke Kawaguchi
 */
public class Fiber implements Runnable, Continuable, Serializable, Cloneable {

    private /*final*/ Runnable target;

    private Continuation continuation;

    private boolean started = false;

    private boolean executing = false;

    /**
     * Creates a new [EMAIL PROTECTED] Fiber} that runs its [EMAIL PROTECTED] #run()} method
     * when started.
     */
    protected Fiber() {
        target = this;
    }

    /**
     * Creates a new [EMAIL PROTECTED] Fiber} that runs the [EMAIL PROTECTED] Runnable#run()} method
     * of the specified instance when started.
     */
    public Fiber(Runnable target) {
        this.target = target;
    }

    /**
     * Begins the execution of this fiber.
     *
     * <p>
     * Unlike normal thread, this method blocks until the fiber [EMAIL PROTECTED] #yield() yields}
     * or completes.
     *
     * @throws IllegalStateException
     *      if this fiber has already been started.
     */
    public void start() {
        if(started) {
            throw new IllegalStateException("the fiber is already started");
        }
        started = true;
        executing = true;
        try {
            continuation = Continuation.startWith("",new ContextImpl(this));
        } finally {
            executing = false;
        }
    }

    /**
     * Resumes the execution of this fiber.
     *
     * <p>
     * This is analogous to the thread resume operation. The fiber will pick up
     * execution from where it [EMAIL PROTECTED] #yield() yielded} last time.
     *
     * This method blocks until the fiber [EMAIL PROTECTED] #yield() yields} or completes.
     */
    public void _continue() {
        if(!isAlive()) {
            throw new IllegalStateException("the fiber has already completed");
        }
        if(executing) {
            throw new IllegalStateException("the fiber is already executing");
        }
        executing = true;
        try {
            continuation = Continuation.continueWith(continuation,new ContextImpl(this));
        } finally {
            executing = false;
        }
    }

    /**
     * Causes the currently runnning fiber to pause.
     *
     * This method stores the state of the execution on side and then
     * return from the [EMAIL PROTECTED] #start()} or [EMAIL PROTECTED] #_continue()} methods.
     *
     * @throws IllegalStateException
     *      unless this method is called from a fiber.
     */
    public static void yield() {
        Continuation.suspend();
    }

    /**
     * Creates an exact replica of this fiber.
     *
     * <p>
     * A fiber can be forked when it's not executing. If this fiber
     * has yielded at execution point A, then the newly created fiber
     * returned from this method will resume its execution from A.
     * In a way, this is similar to Unix process fork.
     *
     * <p>
     * objects on this fiber's stack frames aren't cloned, so two
     * fibers will refer to the same objects. Alternatively, serialization
     * can be also used to perform a deep-copy of a fiber.
     *
     * <p>
     * A fiber can be forked when it's not started yet, or when it's completed.
     *
     * @return
     *      always return a non-null valid [EMAIL PROTECTED] Fiber} object.
     *
     * @throws IllegalStateException
     *      if this fiber is executing.
     */
    public Fiber fork() {
        if(executing) {
            throw new IllegalStateException("cannot fork an executing fiber");
        }

        try {
            return (Fiber)clone();
        } catch (CloneNotSupportedException e) {
            // we implement Cloneable on Fiber, so this is impossible
            throw new Error(e);
        }
    }

    /**
     * Returns true if this fiber is still alive.
     *
     * A fiber is alive if it has been started but not yet died.
     * In particular, a fiber is alive even when it's not executing
     * on a thread.
     */
    public final boolean isAlive() {
        return continuation!=null;
    }

    /**
     * Returns true if this fiber is currently executing on a thread.
     */
    public final boolean isExecuting() {
        return executing;
    }

    /**
     * Should be overidden the derived classes to describe the code
     * that executes when the fiber runs.
     *
     * <p>
     * If this object is constructed with the [EMAIL PROTECTED] #Fiber(Runnable)} method,
     * it runs the <tt>run</tt> method of the specified <tt>Runnable</tt> object. Otherwise
     * report an error.
     */
    public void run() {
        if (target != null) {
            target.run();
       } else {
            throw new IllegalStateException(
                "This is most likely a mistake. You should either specify a Runnable object " +
                "as a constructor parameter, or you should override the run method.");
        }
    }

    /**
     * Gets the currently executing fiber.
     *
     * <p>
     * This method works like [EMAIL PROTECTED] Thread#currentThread()}.
     *
     * @return
     *      null if no fiber is executing (IOW, if this method is called from outside
     *      a fiber.) Otherwise non-null valid fiber.
     */
    public static Fiber currentFiber() {
        ContinuationContext context = Continuation.currentContinuation().getContext();
        if(!(context instanceof ContextImpl)) {
            throw new IllegalStateException("no fiber is executing");
        }
        Fiber f = ((ContextImpl)context).fiber;
        if(!f.isExecuting())
            throw new Error("a bug in javaflow");
        return f;
    }


    private static final class ContextImpl extends ContinuationContext implements MethodLookup {
        Fiber fiber;
        public ContextImpl(Fiber thread) {
            this.fiber = thread;
            setMethodLookup(this);
        }

        public Method getMethod(String method) {
            try {
                return Invoker.class.getDeclaredMethod("run",EMPTY_CLASSES);
            } catch (NoSuchMethodException e) {
                throw new Error(e); // impossible
            }
        }
        private static final Class[] EMPTY_CLASSES = new Class[0];
    }

    static final class Invoker implements ContinuationCapable, Serializable {
        /**
         * Called by [EMAIL PROTECTED] Continuation} and forwards the invocation to the thread.
         * We can't have [EMAIL PROTECTED] Continuation} call the thread.run() method directly
         * because we don't know if the thread object has no-arg constructor.
         *
         * This method is continuation capable without the byte code enhancements.
         */
        /*package*/ void run() { // so that it can be called from Continuation
            ContextImpl context = (ContextImpl)Continuation.currentContinuation().getContext();
            context.fiber.run();
        }
        private static final long serialVersionUID = 1L;
    }

    private static final long serialVersionUID = 1L;
}

Attachment: smime.p7s
Description: S/MIME Cryptographic Signature

Reply via email to