Ok, I solved the issue.  After reading the source code of
org.apache.camel.processor.UnmarshalProcessor, I saw this little piece of code:

            } else if (result instanceof Message) {
// the dataformat has probably set headers, attachments, etc. so let's use it as the outbound payload
                exchange.setOut((Message) result);

Which means if a DataFormat implementation wants to set headers and/or attachments, the unmarshal implementation cannot directly return the results - it must create a new Message and set the results to body of the new Message and return this new Message instead.

I have to say, I never saw this documented anywhere nor any example in the book. Of course I understand that there are a lot of higher priority items to do, so I'm not complaining ;)

Well here's the revised sample DataFormat where you *can* successfully set new headers in the unmarshal implementation:

Regards,

Chris


class CustomDataFormat2 implements DataFormat {

    @Override
    public Object unmarshal(Exchange exchange, InputStream stream)
            throws Exception {

        // To be able to set new headers and/or attachments, etc.
        // we need to wrap the result in a new Message and return
        // the new Message instead... who knew?
        Message msg = exchange.getIn().copy();

        List<String> result = new ArrayList<String>();

        BufferedReader in
            = new BufferedReader(new InputStreamReader(stream));

        String line = null;
        while((line = in.readLine()) != null) {
            result.add(line);
        }

        msg.setBody(result);

        // now it works!
        msg.setHeader("DISAPPEARING_DEMO_HEADER", "Will it be set?");
        return msg;
    }

    @Override
public void marshal(Exchange exchange, Object graph, OutputStream stream)
            throws Exception {
        throw new UnsupportedOperationException("Not implemented.");
    }
}


On 9/20/2013 10:55 AM, Chris wrote:
Jan,

I really appreciate the help, unfortunately your test case does not even
reproduce the issue I am having.  I think I may not have been clear in
my original message.

The issue is in a custom DataFormat, if a set a *new* header, that
header will be gone from the Exchange, for the rest of the route,
down-stream from the unmarshal (or marshal) call.

So the issue is just setting a header and checking it it's still there.
So the best way to recreate the problem is to create the most simplest
DataFormat because marshal/unmarshal is not the issue - the issue is
setting a new header *inside* the custom marshal/unmarshal methods. So I
created a test case whose custom DataFormat does the bare minimum - no
file I/O, no serialization - just string manipulation.

Maybe you and/or other can have a look and explain why there's the issue.

Thanks,

Chris

(the code formatting will be wrecked by the mailing list line length limit)

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;


import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.JndiRegistry;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;


public class SetHeaderDemo extends CamelTestSupport {

     @Test
     public void setHeaderInDataFormatProblem() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(1);
         template.sendBodyAndHeader("direct:start", "one\ntwo\nthree\n",
"DEMO_HEADER", "Hello...");

         // This passes
         assertExpression(mock.getReceivedExchanges().get(0),
                 "simple", "${in.header.DEMO_HEADER}", "Hello...");

         // This FAILS... WHY??? *************
         assertExpression(mock.getReceivedExchanges().get(0),
             "simple", "${in.header.DISAPPEARING_DEMO_HEADER}", "Will it
be set?");

         assertMockEndpointsSatisfied();
     }


     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {

         return new RouteBuilder() {
             @Override
             public void configure() {
                 from("direct:start")
                 .unmarshal("customFmt") // <== will try to add a new
header to Exchange IN msg

.to("log://dataformat.demo?showAll=true&multiline=true&level=INFO") //
<== not there
                 .to("mock:result");
             }
         };
     }

     @Override
     protected JndiRegistry createRegistry() throws Exception {
         JndiRegistry registry = super.createRegistry();

         CustomDataFormat fmt = new CustomDataFormat();
         registry.bind("customFmt", fmt);
         return registry;
     }
}

/**
  * Simplest, contrived DataFormat impl to demonstrate that it's impossible
  * to set a new header in the DataFormat's implementation methods and
  * see the newly added header down-stream from the marshal/unmarshal
  * call(s).
  *
  */
class CustomDataFormat implements DataFormat {

     /**
      * Expects the body to be a newline-delimited list of strings,
      * which will be unmarshalled to a string array, whose elements
      * are the "lines" in the "document".
      *
      * Obviously the marshal/unmarshal process is not important - the
      * issue is that if a new header is added in the DataFormat marshal or
      * unmarshal - it will be GONE after returning.
      */
     @Override
     public Object unmarshal(Exchange exchange, InputStream stream)
             throws Exception {
         List<String> result = new ArrayList<String>();

         BufferedReader in
             = new BufferedReader(new InputStreamReader(stream));

         String line = null;
         while((line = in.readLine()) != null) {
             result.add(line);
         }

         exchange.getIn().setHeader("DISAPPEARING_DEMO_HEADER", "Will it
be set?");
         return result;
     }

     @Override
     public void marshal(Exchange exchange, Object graph, OutputStream
stream)
             throws Exception {
         throw new UnsupportedOperationException("Not implemented.");
     }
}




On 9/19/2013 4:13 AM, Jan Matèrne (jhm) wrote:
I tried building my own DF and that works (for me)

Jan


package org.apache.camel.dataformat;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;

import javax.activation.DataHandler;

import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.ExchangeBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultMessage;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.apache.camel.util.FileUtil;
import org.apache.commons.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class SaveHeaderTest extends CamelTestSupport {

    private static File dataFile = new File("output/message.ser");

    DataFormat customDataFormat = new CustomDataFormat();

    @After
    @Before
    public void cleanup() {
        FileUtil.deleteFile(dataFile);
    }

    @SuppressWarnings("unchecked")
    @Test
    public void save() throws FileNotFoundException, Exception {
        File writtenTo = new File("output/message.ser");
        assertFalse(writtenTo.exists());
        Exchange exchange = ExchangeBuilder.anExchange(context)
                .withBody("Hello World")
                .withHeader("from", "Apache Camel")
                .withHeader("test", "save")
                .withHeader(Exchange.FILE_NAME,
"message.ser")
                .build();
        template.send("direct:save", exchange);
        assertTrue(writtenTo.exists());

        // actual
        ObjectInputStream in = new ObjectInputStream(new
FileInputStream(dataFile));
        Object body = in.readObject();
        Map<String,Object> headers = (Map<String, Object>)
in.readObject();
        Map<String,DataHandler> attachments = (Map<String,
DataHandler>) in.readObject();
        String messageId = (String) in.readObject();
        boolean fault = (boolean) in.readObject();

        assertEquals("Hello World", body);
        assertEquals("Apache Camel", headers.get("from"));
        assertEquals("save", headers.get("test"));
        assertTrue(attachments.isEmpty());
        assertEquals(exchange.getIn().getMessageId(), messageId);
        assertFalse(fault);
    }

    @Test
    public void load() throws Exception {
        // create test data
        ObjectOutputStream out = new ObjectOutputStream(new
FileOutputStream(dataFile));
        out.writeObject("Hello World");

        Map<String,Object> headers = new HashMap<String, Object>();
        headers.put("from", "Apache Camel");
        headers.put("test", "load");
        out.writeObject(headers);

        Map<String,DataHandler> attachments = new
HashMap<String,DataHandler>();
        out.writeObject(attachments);

        out.writeObject("message-id");
        out.writeObject(false);
        out.close();

        // load the saved 'exchange'
        Exchange loadCommand = ExchangeBuilder.anExchange(context)
                .withBody(dataFile.getAbsolutePath())
                .build();
        Exchange response = template.send("direct:load",
loadCommand);
        assertEquals("Hello World",
response.getOut().getBody(String.class));
        assertEquals("Apache Camel",
response.getOut().getHeader("from"));
        assertEquals("load", response.getOut().getHeader("test"));
    }

    @Test
    public void turnaround() {
        Exchange exchange = ExchangeBuilder.anExchange(context)
                .withBody("Hello World")
                .withHeader("from", "Apache Camel")
                .build();
        Exchange response = template.send("direct:turnaround",
exchange);
        assertEquals("Hello World",
response.getOut().getBody(String.class));
        assertEquals("Apache Camel",
response.getOut().getHeader("from"));
    }


    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("direct:save")
                    .marshal(customDataFormat)
                    .to("file:output");
                from("direct:load")
                    .process(openFile())
                    .unmarshal(customDataFormat);
                from("direct:turnaround")
                    .marshal(customDataFormat)
                    .unmarshal(customDataFormat);
            }

            private Processor openFile() {
                return new Processor() {
                    @Override
                    public void process(Exchange
exchange) throws Exception {
                        String filename =
exchange.getIn().getBody(String.class);
                        FileInputStream stream = new
FileInputStream(filename);

exchange.getIn().setBody(stream);
                    }
                };
            }
        };
    }

    class CustomDataFormat implements DataFormat {

        @Override
        public void marshal(Exchange exchange, Object graph,
OutputStream stream)
                throws Exception {
            ObjectOutputStream out = null;

            try {
                out = new ObjectOutputStream(stream);
                // Save the body
                out.writeObject(graph);
                // Save the message
                Message message = exchange.getIn();
                if (message != null) {
                    // Message is not serializable, so
save the data individually

out.writeObject(message.getHeaders());

out.writeObject(message.getAttachments());

out.writeObject(message.getMessageId());
                    out.writeObject(message.isFault());
                }
            } finally {
                IOUtils.closeQuietly(out);
            }
        }

        @SuppressWarnings("unchecked")
        @Override
        public Object unmarshal(Exchange exchange, InputStream
stream)
                throws Exception {

            ObjectInputStream in = null;
            try {
                in = new ObjectInputStream(stream);

                // read the raw data
                Object body = in.readObject();
                Map<String,Object> headers = (Map<String,
Object>) in.readObject();
                Map<String,DataHandler> attachments =
(Map<String, DataHandler>) in.readObject();
                String messageId = (String) in.readObject();
                boolean fault = (boolean) in.readObject();

                // build the message
                Message msg = new DefaultMessage();
                msg.setBody(body);
                msg.setAttachments(attachments);
                msg.setHeaders(headers);
                msg.setMessageId(messageId);
                msg.setFault(fault);

                return msg;
            } finally {
                IOUtils.closeQuietly(in);
            }
        }
    }

}

-----Ursprüngliche Nachricht-----
Von: Jan Matèrne (jhm) [mailto:apa...@materne.de]
Gesendet: Donnerstag, 19. September 2013 07:06
An: users@camel.apache.org
Betreff: AW: How to set a header in custom DataFormat?

What I have found is, that the exchange object which is passed to the
DataFormat is only used for getting the CamelContext.
I searched a little bit further and found a processor for
unmarshalling:

org.apache.camel.processor.UnmarshalProcessor.process(Exchange,
AsyncCallback)

And there is a note

   Object result = dataFormat.unmarshal(exchange, stream);
     if (result instanceof Exchange) {
       if (result != exchange) {
         // it's not allowed to return another exchange other than the
one provided to dataFormat
         throw new RuntimeCamelException("The returned exchange " +
result + " is not the same as " + exchange + " provided to the
DataFormat");


The logic of that processor is:
- get the object from the stream
- if it is an exchange, throw that exception
- if it is a message, store as 'out' on the exchange-parameter
- if it is something else, store it as body of the 'out' message


So have you tried setting the header on the 'out' message?


Jan



-----Ursprüngliche Nachricht-----
Von: Chris [mailto:cwolf.a...@gmail.com]
Gesendet: Mittwoch, 18. September 2013 22:58
An: users@camel.apache.org
Betreff: How to set a header in custom DataFormat?

I implemented a custom DataFormat and in the "unmarshal(Exchange
exchange, InputStream stream)" implementation I set a header, but
upon
attempting to retrieve the header downstream from the "unmarshal"
call, the header is not there.  Since the pattern is inOnly, I added
the header to the in Message.  I've done similar with custom
Processors and in that case, it works.  What is different with
DataFormat?  Is there any way to set header(s) from DataFormat
marshal/unmarshal?

Thanks,

Chris


Reply via email to