Hi, I’m working on a subject “Using Cache with Apache Camel and How to treat large file”.
The purpose is to treat large file with camel without loading the file into memory because it’s a huge file over 5 GO. We found several tracks, the first track is to use the splitter component, to allow us to read the file for example line by line or block by block, however if we use the splitter we are not able to read again the file from the beginning, a functional need is to be able to read some part of the file even when the split is finished. So we have to use a cache system, to put blocks in the cache to reuse them. So we thought that is was compulsory to use the class CachedOutputStream to write on disk some part of the file after the splitter , this class also provides the ability to encrypt data on disk. example below : <camelContext xmlns="http://camel.apache.org/schema/spring" trace="false" streamCache="true"> <streamCaching id="myCacheConfig" spoolDirectory="target/cachedir" spoolThreshold="16"/> <route id="SPLIT-FLOW" streamCache="true"> <from uri="file:src/data/forSplitCaching\SimpleRecord?noop=true"/> <split streaming="true"> <tokenize token="\n"/> <to uri="direct:PROCESS-BUSINESS"/> </split> </route> <route id="PROCESS-BUSINESS" streamCache="true"> <from uri="direct:PROCESS-BUSINESS"/> <bean ref="ProcessBusiness" method="dealRecord"/> <choice> <when> <simple>${in.header.CamelSplitComplete} == "true"</simple> <to uri="direct:STREAM-CACHING"/> </when> </choice> </route> <route id="STREAM-CACHING"> <from uri="direct:STREAM-CACHING"/> <bean ref="ProcessStreamCaching" method="usingStream"/> <setHeader headerName="CamelFileName"> <simple>${header.CamelFileName}.${header.CamelSplitIndex}</simple> </setHeader> <to uri="file:src/out"/> </route> </camelContext> The method dealRecord puts each line splitted into a cache: public void dealRecord(Exchange exchange) throws Exception { String body; File file; String[] files; boolean isSplitComplete; body = (String) exchange.getIn().getBody(); isSplitComplete = (boolean) exchange.getProperties().get("CamelSplitComplete"); CachedOutputStream cos = new CachedOutputStream(exchange, false); cos.write(body.getBytes("UTF-8")); file = new File("target/cachedir"); files = file.list(); for (String nameTmpfile : files) { LOG.info("Genered File [" + nameTmpfile + "]"); } lstCache.add(cos); if(isSplitComplete){ exchange.getIn().setHeader("Cached",lstCache); } } The method usingStream,can use each cache existing in the header public byte[] usingStream(Exchange exchange) throws InputStreamException { final ArrayList<CachedOutputStream> lstcache; byte[] bytesMessage; StringBuilder messageCompleteOut = new StringBuilder(); InputStream is = null; lstcache = (ArrayList<CachedOutputStream>) exchange.getIn().getHeader("Cached"); for (CachedOutputStream oneCache : lstcache) { try { is = oneCache.getWrappedInputStream(); String messageInputstream = toString(is); LOG.info("Message of Cache ["+ messageInputstream +"]"); messageCompleteOut.append(messageInputstream); messageCompleteOut.append(System.lineSeparator()); } catch (IOException e) { LOG.error(InputStreamException.ERROR_MANIPULATING_INPUT_STREAM_CHANNEL); throw new InputStreamException(InputStreamException.ERROR_MANIPULATING_INPUT_STREAM_CHANNEL,e); } // On ferme le flux IOHelper.close(is); } bytesMessage = messageCompleteOut.toString().getBytes(Charset.forName("UTF-8")); return bytesMessage; } Is that solution seems ok ? or maybe there is a better way ? thxs -- View this message in context: http://camel.465427.n5.nabble.com/Cache-Streaming-Treat-large-file-tp5780687.html Sent from the Camel - Users mailing list archive at Nabble.com.