Claus,
Please forgive all the @ in here as that was the only way I could post this
to get around the spam filter.

Just to get this working for me so I can start refactoring my code to accept
and process individual items from the unmarshaling of the multi-line beanio
beans I hacked this bit of code.  I don't want to use it but it let's me
proceed until I can get something like this to work with the Camel file
endpoint.

As you can see I'm injecting a seda queue in here to drop the individual
items on and a directory w@tcher to w@tch for incoming files.  My unit test
copies a file over into the directory and I can see the results.  I have to
put the exclusive lock mechanics in there or the bean reader would try to
read the file before it was truly available.  But I don't want to replicate
all the mechanics that are already baked in and hardened in the Camel File
endpoint.

Doing it this way though changes this from a pull mechanism to a
push/publish mechanism that I prefer.

I have a couple of questions about how I might proceed with this.  I'm fine
with reading the Beanio stream/definition from my blueprint bundle with
getResourceAsStream and foregoing the use of the Camel marsahaler itself. 
If I do that, what mecahnics would I use to get a notification from the File
endpoint that a new file has been dropped into the directory.  Just a
standard Camel Processor?  

You'd mentioned an iterator/reader object and noticed some in the BeanIO
code itself but I don't think I saw anything in Camel itself. 

I did try just chunking the raw text but the problem with that is if I read
in a chunk of 100 records the 100th might well fall in the middle of a
record.

Thanks for any insights.

@EndpointInject(uri = "seda:process")
        private ProducerTemplate process;

        Logger logger = Logger.getLogger(AHSBatchReader.class);

        private StreamFactory factory;
        private w@tchService w@tcher;
        Path dir;

        public void init() throws IOException {
                logger.info("Starting stream factory...");
                // create a StreamFactory
                factory = StreamFactory.newInstance();
                // load the mapping file...switch to use getResourceAsStream 
from bundle.
                
factory.load("src/main/resources/dataformats/beanio-ahs-in.xml");

                w@tcher = FileSystems.getDefault().neww@tchService();
                dir = FileSystems.getDefault().getPath("/test/in");
                dir.register(w@tcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
                new Thread(this).start();
        }

        public void run() {
                while (true) {
                        w@tchKey key;
                        try {
                                key = w...@tcher.take();

                                for (w@tchEvent<?> event : key.pollEvents()) {
                                        w...@tchevent.kind kind = event.kind();
                                        w@tchEvent<Path> ev = cast(event);
                                        Path name = ev.context();
                                        Path child = dir.resolve(name);

                                        // print out event
                                        System.out.format("%s: %s\n", 
event.kind().name(), child);
                                        if (event.kind() == ENTRY_MODIFY) {
                                                File file = child.toFile();
                                                boolean exclusivelyLocked = 
false;
//Get an exclusive lock on the file if we can.  If we can't then the system
is still using and it we delay start of beanio reading.
                                                while (!exclusivelyLocked) {
                                                        try {
                                                                FileChannel 
channel = new RandomAccessFile(file, "rw").getChannel();
                                                                FileLock lock = 
null;
                                                                try {
                                                                        lock = 
channel.tryLock();

                                                                        
System.out.println(lock);
                                                                        
System.out.println(lock.isShared());
                                                                        
System.out.println(lock.isValid());
                                                                        
exclusivelyLocked = lock.isValid() && !lock.isShared();
                                                                } catch 
(Exception e) {
                                                                        // File 
is open by someone else
                                                                        
Thread.sleep(500);
                                                                }
//Release them now so that Beanio can grab them.
                                                                lock.close();
                                                                channel.close();
                                                        } catch (Exception e) {
                                                        }
                                                }
//Start processing the beanio stream.
                                                execute(file);

                                        }
                                }

                        } catch (InterruptedException x) {
                        }
                }

        }

        @SuppressWarnings("unchecked")
        static <T> w@tchEvent<T> cast(w@tchEvent<?> event) {
                return (w@tchEvent<T>) event;
        }

        private void execute(File file) {

                BeanReader in = factory.createReader("myInputStream",
file.getAbsoluteFile());
                Object o;
                while ((o = in.read()) != null) {
                        //Send these to the SEDA queue
                        process.sendBody(o);
                }

                in.close();
                file.delete();

        }

}




--
View this message in context: 
http://camel.465427.n5.nabble.com/Beanio-stream-tp5780468p5780717.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to