Thanks, I did that eventually... although it did not solve the issue. Turns
out that camel-aws forcibly converts the Exchange body to an InputStream.
Camel's File->InputStream converter buffers the InputStream so things still
fail.

I've attached a patch that fixes this by having the AWS SDK handle the
InputStream in a non-buffered way, however this causes one test
("sendFileAndKeep") to fail. I'm assuming this might be because the patch
prevents conversion of the Exchange body to an InputStream?

Do you want me to create a bug for this or is this intended behaviour?



On Tue, Oct 14, 2014 at 2:58 PM, Willem Jiang <willem.ji...@gmail.com>
wrote:

> You can just store the file into the message body, just as you did with
> the String objects.
>
>
> --
> Willem Jiang
>
> Red Hat, Inc.
> Web: http://www.redhat.com
> Blog: http://willemjiang.blogspot.com (English)
> http://jnn.iteye.com (Chinese)
> Twitter: willemjiang
> Weibo: 姜宁willem
>
>
>
> On October 14, 2014 at 4:09:39 PM, Andreas C. Osowski (
> andreas.chr.osow...@gmail.com) wrote:
> > Thanks for the reply. I also came to that conclusion eventually. However,
> > could you explain to me more how I'd store each batch in a file?
> > It is my understanding so far that the Aggregator is called whenever the
> > input emits a new exchange. If I were to store the messages in a file,
> > how'd I know inside the aggregator that the current batch is complete?
> > After all, I can hardly continue writing messages to the file while
> aws-s3
> > is pushing the same to s3
> > On 14 Oct 2014 09:08, "Willem Jiang" wrote:
> >
> > > I just found the code.
> > >
> > > class JsonBodyAppender {
> > > def append(existing: String, next: String) = existing + "\n" + next
> > > }
> > >
> > > There are too many String there, maybe you can just append the message
> > > into a file.
> > >
> > > --
> > > Willem Jiang
> > >
> > > Red Hat, Inc.
> > > Web: http://www.redhat.com
> > > Blog: http://willemjiang.blogspot.com (English)
> > > http://jnn.iteye.com (Chinese)
> > > Twitter: willemjiang
> > > Weibo: 姜宁willem
> > >
> > >
> > >
> > > On October 14, 2014 at 2:32:00 PM, Willem Jiang (
> willem.ji...@gmail.com)
> > > wrote:
> > > > Can you show us how did you do the aggregation? Just some code
> snippet
> > > is OK.
> > > >
> > > > BTW, you can always using write the stream into to file to avoid load
> > > the whole message
> > > > into memory.
> > > >
> > > > --
> > > > Willem Jiang
> > > >
> > > > Red Hat, Inc.
> > > > Web: http://www.redhat.com
> > > > Blog: http://willemjiang.blogspot.com (English)
> > > > http://jnn.iteye.com (Chinese)
> > > > Twitter: willemjiang
> > > > Weibo: 姜宁willem
> > > >
> > > >
> > > >
> > > > On October 14, 2014 at 5:23:21 AM, Andreas C. Osowski (
> > > andreas.chr.osow...@gmail.com)
> > > > wrote:
> > > > > Hey there.
> > > > > I'm running into "OOM: Java heap space" exceptions trying to
> aggregate
> > > > > twitter messages before pushing them to s3. (Xmx/Xms = 1.5g;
> there's
> > > > > definitely sufficient free memory available when the exceptions
> happen)
> > > > > I've also tried to use camel-leveldb as the AggregationRepository
> but
> > > > > without luck. Smaller batch sizes (i.e. 100) work just fine... 5000
> > > > > aggregated messages should also take up just 60mb-ish (assuming an
> > > average
> > > > > of 3000chars per message).
> > > > >
> > > > > The relevant code can be found here:
> > > > > https://gist.github.com/th0br0/a1484ea1ad18b8b20c25
> > > > >
> > > > > Could somebody point me to what I'm doing wrong?
> > > > >
> > > > > Thanks a lot!
> > > > >
> > > >
> > > >
> > >
> > >
> >
>
>
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Producer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Producer.java
index 825af36..f280bc4 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Producer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Producer.java
@@ -94,13 +94,19 @@ public class S3Producer extends DefaultProducer {
         }
 
         File filePayload = null;
+        PutObjectRequest putObjectRequest = null;
 
         Object obj = exchange.getIn().getMandatoryBody();
         if (obj instanceof File) {
             filePayload = (File)obj;
-        }
-        PutObjectRequest putObjectRequest = new PutObjectRequest(getConfiguration().getBucketName(),
+            putObjectRequest = new PutObjectRequest(getConfiguration().getBucketName(),
+                determineKey(exchange), filePayload);
+
+        } else {
+            putObjectRequest = new PutObjectRequest(getConfiguration().getBucketName(),
                 determineKey(exchange), exchange.getIn().getMandatoryBody(InputStream.class), objectMetadata);
+        }
+        
 
         String storageClass = determineStorageClass(exchange);
         if (storageClass != null) {
@@ -177,4 +183,4 @@ public class S3Producer extends DefaultProducer {
     public S3Endpoint getEndpoint() {
         return (S3Endpoint) super.getEndpoint();
     }
-}
\ No newline at end of file
+}

Reply via email to