I wrote small code by copying the logic of Loop and defined my own 'While'. As 
I have no experience what-so-ever in camel contribution or knowledge about 
whether extension of Loop is proposed solution or writing existing one, I was 
asking if there was any JIRA item.

There are two new classes (They can be named better -- LoopToDefinition may 
be??):
- WhileDefinition
- WhileProcessor

And add two methods to -- ProcessorDefinition
- 'public void while(Expression)'
- 'public ExpressionClause<WhileDefinition> while()'

The only question about 'do-while' and 'while' can may be decided either by 
header or additional property.

Regards,
Arpit.

Below are the two classes :)

///////////////////////////////
// BEGIN OF WhileProcessor //
///////////////////////////////
public class WhileProcessor extends DelegateAsyncProcessor implements Traceable 
{

        private static final Logger LOG = 
LoggerFactory.getLogger(LoopProcessor.class);

    private final Expression expression;
    private final boolean copy;

    public WhileProcessor(Processor processor, Expression expression, boolean 
copy) {
        super(processor);
        this.expression = expression;
        this.copy = copy;
    }

    @Override
    public boolean process(Exchange exchange, AsyncCallback callback) {
        // use atomic integer to be able to pass reference and keep track on 
the values
        AtomicInteger index = new AtomicInteger();
        AtomicBoolean atomicExpressionEvaluation = new AtomicBoolean();

        try {
                evaluateExpression(exchange, atomicExpressionEvaluation);
        } catch (NoTypeConversionAvailableException e) {
            exchange.setException(e);
            callback.done(true);
            return true;
        }

        // we hold on to the original Exchange in case it's needed for copies
        final Exchange original = exchange;

        // per-iteration exchange
        Exchange target = exchange;

        // set the size before we start
        exchange.setProperty(Exchange.LOOP_SIZE, index);

        // loop synchronously
        while (atomicExpressionEvaluation.get()) {

            // and prepare for next iteration
            // if (!copy) target = exchange; else copy of original
            target = prepareExchange(exchange, index.get(), original);
            boolean sync = process(target, callback, index, 
atomicExpressionEvaluation, original);

            if (!sync) {
                LOG.trace("Processing exchangeId: {} is continued being 
processed asynchronously", target.getExchangeId());
                // the remainder of the routing slip will be completed async
                // so we break out now, then the callback will be invoked which 
then continue routing from where we left here
                return false;
            }

            LOG.trace("Processing exchangeId: {} is continued being processed 
synchronously", target.getExchangeId());

            // increment counter before next loop
            index.getAndIncrement();

            try {
                evaluateExpression(exchange, atomicExpressionEvaluation);
            } catch (NoTypeConversionAvailableException e) {
                exchange.setException(e);
                callback.done(true);
                return true;
            }
        }

        // we are done so prepare the result
        ExchangeHelper.copyResults(exchange, target);
        LOG.trace("Processing complete for exchangeId: {} >>> {}", 
exchange.getExchangeId(), exchange);
        callback.done(true);
        return true;
    }

    protected boolean process(final Exchange exchange, final AsyncCallback 
callback,
                              final AtomicInteger index, final AtomicBoolean 
atomicExpressionEvaluation,
                              final Exchange original) {

        // set current index as property
        LOG.debug("LoopProcessor: iteration #{}", index.get());
        exchange.setProperty(Exchange.LOOP_INDEX, index.get());

        boolean sync = processor.process(exchange, new AsyncCallback() {
            public void done(boolean doneSync) {
                // we only have to handle async completion of the routing slip
                if (doneSync) {
                    return;
                }

                Exchange target = exchange;

                // increment index as we have just processed once
                index.getAndIncrement();

                // continue looping asynchronously
                while (atomicExpressionEvaluation.get()) {

                    // and prepare for next iteration
                    target = prepareExchange(exchange, index.get(), original);

                    // process again
                    boolean sync = process(target, callback, index, 
atomicExpressionEvaluation, original);
                    if (!sync) {
                        LOG.trace("Processing exchangeId: {} is continued being 
processed asynchronously", target.getExchangeId());
                        // the remainder of the routing slip will be completed 
async
                        // so we break out now, then the callback will be 
invoked which then continue routing from where we left here
                        return;
                    }

                    // increment counter before next loop
                    index.getAndIncrement();

                    try {
                        evaluateExpression(exchange, 
atomicExpressionEvaluation);
                    } catch (NoTypeConversionAvailableException e) {
                        exchange.setException(e);
                        callback.done(true);
                        return;
                    }
                }

                // we are done so prepare the result
                ExchangeHelper.copyResults(exchange, target);
                LOG.trace("Processing complete for exchangeId: {} >>> {}", 
exchange.getExchangeId(), exchange);
                callback.done(false);
            }
        });

        return sync;
    }

    /**
     * Prepares the exchange for the next iteration
     *
     * @param exchange the exchange
     * @param index the index of the next iteration
     * @return the exchange to use
     */
    protected Exchange prepareExchange(Exchange exchange, int index, Exchange 
original) {
        if (copy) {
            // use a copy but let it reuse the same exchange id so it appear as 
one exchange
            // use the original exchange rather than the looping exchange (esp. 
with the async routing engine)
            return ExchangeHelper.createCopy(original, true);
        } else {
            ExchangeHelper.prepareOutToIn(exchange);
            return exchange;
        }
    }

    public Expression getExpression() {
        return expression;
    }

    public String getTraceLabel() {
        return "loop[" + expression + "]";
    }

    @Override
    public String toString() {
        return "Loop[for: " + expression + " times do: " + getProcessor() + "]";
    }

    private void evaluateExpression(Exchange exchange, AtomicBoolean 
atomicExpressionEvaluation) throws NoTypeConversionAvailableException {
                // Intermediate conversion to String is needed when direct 
conversion to Boolean is not available
                // but evaluation result is a textual representation of a 
numeric value.
                String expressionEvaluation = expression.evaluate(exchange, 
String.class);
                boolean result = 
ExchangeHelper.convertToMandatoryType(exchange, Boolean.class, 
expressionEvaluation);
                atomicExpressionEvaluation.set(result);
        }

}

///////////////////////////////
// BEGIN OF WhileDefinition //
///////////////////////////////
@XmlRootElement(name = "while")
@XmlAccessorType(XmlAccessType.FIELD)
public class WhileDefinition extends ExpressionNode {

        @XmlAttribute
        private Boolean copy;

        public WhileDefinition() {
        }

        public WhileDefinition(Expression expression) {
                super(expression);
        }

        public WhileDefinition(ExpressionDefinition expression) {
                super(expression);
        }

        /**
     * Enables copy mode so a copy of the input Exchange is used for each 
iteration.
     * That means each iteration will start from a copy of the same message.
     * <p/>
     * By default loop will loop the same exchange all over, so each iteration 
may
     * have different message content.
     *
     * @return the builder
     */
        public WhileDefinition copy() {
                setCopy(true);
                return this;
        }

        public void setExpression(Expression expr) {
                
setExpression(ExpressionNodeHelper.toExpressionDefinition(expr));
        }

        public Boolean getCopy() {
                return copy;
        }

        public void setCopy(Boolean copy) {
                this.copy = copy;
        }

        public boolean isCopy() {
                // do not copy by default to be backwards compatible
                return copy != null ? copy : false;
        }

        @Override
        public String toString() {
                return "While[" + getExpression() + " -> " + getOutputs() + "]";
        }

        @Override
        public String getLabel() {
                return "while[" + getExpression() + "]";
        }

        @Override
        public String getShortName() {
                return "while";
        }

        @Override
        public Processor createProcessor(RouteContext routeContext) throws 
Exception {
                Processor output = this.createChildProcessor(routeContext, 
true);
                return new WhileProcessor(output, 
getExpression().createExpression(routeContext), isCopy());
        }
}


-----Original Message-----
From: Dale King [mailto:dalewk...@gmail.com]
Sent: Monday, September 30, 2013 10:01 PM
To: users@camel.apache.org
Subject: Re: Extend Camel Loop to support While Behavior

I thought for sure I read the Jira issue for adding a while loop since I
too had to figure out that work around, but I'll be darned if I can find
the Jira issue now.

I found a post from last year where Claus said, "There is a JIRA to make
the loop like a while loop so we can use a predicate to know if we should
continue looping or not. So someday in a future Camel release you can do
it." Perhaps that is what I am thinking of, but I cannot find the Jira that
Claus is referring to.

The concept for adding it is very simple. The one detail to work out is you
need to support while and do-while (whether the predicate is checked before
the first iteration or not).


On Mon, Sep 30, 2013 at 10:43 AM, Goyal, Arpit <arpit.go...@sap.com> wrote:

> Hi Dale,
>
> What you mean by use distribution list to set header to NULL? Can you
> share the example (Spring DSL if possible)
>         Do you mean that we should change the value of Headers
> "LOOP_INDEX" & "LOOP_SIZE"?
>
> Is this the JIRA Item for the same? -->
> https://issues.apache.org/jira/browse/CAMEL-4564?jql=text%20~%20%22Loop%20EIP%22
>         This propose to create new DSL element (loopTo)?
>
> Regards,
> Arpit.
>
> -----Original Message-----
> From: Dale King [mailto:d...@jadabeauty.com]
> Sent: Monday, September 30, 2013 7:24 PM
> To: users@camel.apache.org
> Subject: Re: Extend Camel Loop to support While Behavior
>
> There is a JIRA from a year or 2 ago that has no progress. This would be
> trivial to add.
>
> The work around is to use distribution list on a header and when you want
> to exit set that header to null.
>
> On Sep 30, 2013, at 2:28 AM, Claus Ibsen <claus.ib...@gmail.com> wrote:
>
> > Hi
> >
> > Yeah we have talked about this in the past. Not sure if there is a
> > JIRA. You are welcome to look and if you cant find a JIRA then log a
> > new JIRA for this.
> >
> >
> >
> > On Mon, Sep 30, 2013 at 6:01 AM, Goyal, Arpit <arpit.go...@sap.com>
> wrote:
> >> Hi Claus,
> >>
> >> Is there a plan to support Camel Loop to support While behavior? Is
> there a jira issue for the same?
> >>
> >> Regards,
> >> Arpit.
> >
> >
> >
> > --
> > Claus Ibsen
> > -----------------
> > Red Hat, Inc.
> > Email: cib...@redhat.com
> > Twitter: davsclaus
> > Blog: http://davsclaus.com
> > Author of Camel in Action: http://www.manning.com/ibsen
>
>


--
Dale King

Reply via email to