Claus, Please forgive all the @ in here as that was the only way I could post this to get around the spam filter.
Just to get this working for me so I can start refactoring my code to accept and process individual items from the unmarshaling of the multi-line beanio beans I hacked this bit of code. I don't want to use it but it let's me proceed until I can get something like this to work with the Camel file endpoint. As you can see I'm injecting a seda queue in here to drop the individual items on and a directory w@tcher to w@tch for incoming files. My unit test copies a file over into the directory and I can see the results. I have to put the exclusive lock mechanics in there or the bean reader would try to read the file before it was truly available. But I don't want to replicate all the mechanics that are already baked in and hardened in the Camel File endpoint. Doing it this way though changes this from a pull mechanism to a push/publish mechanism that I prefer. I have a couple of questions about how I might proceed with this. I'm fine with reading the Beanio stream/definition from my blueprint bundle with getResourceAsStream and foregoing the use of the Camel marsahaler itself. If I do that, what mecahnics would I use to get a notification from the File endpoint that a new file has been dropped into the directory. Just a standard Camel Processor? You'd mentioned an iterator/reader object and noticed some in the BeanIO code itself but I don't think I saw anything in Camel itself. I did try just chunking the raw text but the problem with that is if I read in a chunk of 100 records the 100th might well fall in the middle of a record. Thanks for any insights. @EndpointInject(uri = "seda:process") private ProducerTemplate process; Logger logger = Logger.getLogger(AHSBatchReader.class); private StreamFactory factory; private w@tchService w@tcher; Path dir; public void init() throws IOException { logger.info("Starting stream factory..."); // create a StreamFactory factory = StreamFactory.newInstance(); // load the mapping file...switch to use getResourceAsStream from bundle. factory.load("src/main/resources/dataformats/beanio-ahs-in.xml"); w@tcher = FileSystems.getDefault().neww@tchService(); dir = FileSystems.getDefault().getPath("/test/in"); dir.register(w@tcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY); new Thread(this).start(); } public void run() { while (true) { w@tchKey key; try { key = w...@tcher.take(); for (w@tchEvent<?> event : key.pollEvents()) { w...@tchevent.kind kind = event.kind(); w@tchEvent<Path> ev = cast(event); Path name = ev.context(); Path child = dir.resolve(name); // print out event System.out.format("%s: %s\n", event.kind().name(), child); if (event.kind() == ENTRY_MODIFY) { File file = child.toFile(); boolean exclusivelyLocked = false; //Get an exclusive lock on the file if we can. If we can't then the system is still using and it we delay start of beanio reading. while (!exclusivelyLocked) { try { FileChannel channel = new RandomAccessFile(file, "rw").getChannel(); FileLock lock = null; try { lock = channel.tryLock(); System.out.println(lock); System.out.println(lock.isShared()); System.out.println(lock.isValid()); exclusivelyLocked = lock.isValid() && !lock.isShared(); } catch (Exception e) { // File is open by someone else Thread.sleep(500); } //Release them now so that Beanio can grab them. lock.close(); channel.close(); } catch (Exception e) { } } //Start processing the beanio stream. execute(file); } } } catch (InterruptedException x) { } } } @SuppressWarnings("unchecked") static <T> w@tchEvent<T> cast(w@tchEvent<?> event) { return (w@tchEvent<T>) event; } private void execute(File file) { BeanReader in = factory.createReader("myInputStream", file.getAbsoluteFile()); Object o; while ((o = in.read()) != null) { //Send these to the SEDA queue process.sendBody(o); } in.close(); file.delete(); } } -- View this message in context: http://camel.465427.n5.nabble.com/Beanio-stream-tp5780468p5780717.html Sent from the Camel - Users mailing list archive at Nabble.com.