Guess this is the reason:
a1.sinks.k4.hdfs.path = *s3n*://${path}
It should be s3://
Regards,
Ahmed
On Mon, Oct 20, 2014 at 9:19 AM, Xuesong Ding <[email protected]> wrote:
> Dear all,
>
> We use flume-ng to put data into s3 and hdfs both, but it occurs some
> errors when close s3 file. Should we adjust flume parameters or do
> something else? Any ideas? Below are the error itself and our flume-ng
> configuration, thanks a lot.
>
> BTW, our flume-ng version is cdh4.7.0
>
> Exception stack trace:
> 2014-10-14 12:00:30,158 ERROR org.apache.flume.sink.hdfs.BucketWriter:
> Unexpected error
> com.cloudera.org.apache.http.NoHttpResponseException: The target server
> failed to respond
> at
> com.cloudera.org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:95)
> at
> com.cloudera.org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:62)
> at
> com.cloudera.org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:254)
> at
> com.cloudera.org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:289)
> at
> com.cloudera.org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:252)
> at
> com.cloudera.org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:219)
> at
> com.cloudera.org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:300)
> at
> com.cloudera.org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:127)
> at
> com.cloudera.org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:715)
> at
> com.cloudera.org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:520)
> at
> com.cloudera.org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:906)
> at
> com.cloudera.org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:805)
> at
> com.cloudera.org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:334)
> at
> com.cloudera.org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:281)
> at
> com.cloudera.org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestPut(RestStorageService.java:1043)
> at
> com.cloudera.org.jets3t.service.impl.rest.httpclient.RestStorageService.copyObjectImpl(RestStorageService.java:2029)
> at
> com.cloudera.org.jets3t.service.StorageService.copyObject(StorageService.java:871)
> at
> com.cloudera.org.jets3t.service.StorageService.copyObject(StorageService.java:916)
> at
> org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:314)
> at sun.reflect.GeneratedMethodAccessor46.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
> at org.apache.hadoop.fs.s3native.$Proxy18.copy(Unknown Source)
> at
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.rename(NativeS3FileSystem.java:645)
> at
> org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:541)
> at
> org.apache.flume.sink.hdfs.BucketWriter$8$1.run(BucketWriter.java:589)
> at
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:161)
> at
> org.apache.flume.sink.hdfs.BucketWriter.access$800(BucketWriter.java:57)
> at
> org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:586)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> Flume-ng configuration:
> ## THIS FILE CONTAINS FLUME TIER_1 CONFIGURATION
>
> # DIFINE COMPONENTS
> a1.sources = r1
> a1.sinks = k1 k2 k3 k4
> a1.channels = c1 c2
> a1.sinkgroups = g1 g2
>
> # SOURCE(CUSTOM)
> a1.sources.r1.type = spooldir
> a1.sources.r1.spoolDir = /ssd/disk2
> a1.sources.r1.deletePolicy= never
> a1.sources.r1.ignorePattern= ^.*\\.tmp$
> a1.sources.r1.batchSize= 1
> a1.sources.r1.deserializer=
> org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
> a1.sources.r1.deserializer.maxBlobLength = 300000000
>
> # SINK (HDFS)
> a1.sinks.k1.type = hdfs
> a1.sinks.k1.hdfs.filePrefix = packet
> a1.sinks.k1.hdfs.batchSize= 1
> a1.sinks.k1.hdfs.fileSuffix = .snappy
> a1.sinks.k1.hdfs.codeC = snappy
> a1.sinks.k1.hdfs.fileType = CompressedStream
> a1.sinks.k1.hdfs.rollCount = 0
> a1.sinks.k1.hdfs.rollSize = 500000000
> a1.sinks.k1.hdfs.rollInterval = 300
> a1.sinks.k1.hdfs.path = hdfs:/${path}
>
> # SINK (HDFS)
> a1.sinks.k2.type = hdfs
> a1.sinks.k2.hdfs.filePrefix = packet
> a1.sinks.k2.hdfs.batchSize= 1
> a1.sinks.k2.hdfs.fileSuffix = .snappy
> a1.sinks.k2.hdfs.codeC = snappy
> a1.sinks.k2.hdfs.fileType = CompressedStream
> a1.sinks.k2.hdfs.rollCount = 0
> a1.sinks.k2.hdfs.rollSize = 500000000
> a1.sinks.k2.hdfs.rollInterval = 300
> a1.sinks.k2.hdfs.path = hdfs://${path}
>
> #SINK (S3)
> a1.sinks.k3.type = hdfs
> a1.sinks.k3.hdfs.filePrefix = packet
> a1.sinks.k3.hdfs.batchSize= 1
> a1.sinks.k3.hdfs.fileSuffix = .snappy
> a1.sinks.k3.hdfs.codeC = snappy
> a1.sinks.k3.hdfs.fileType = CompressedStream
> a1.sinks.k3.hdfs.rollCount = 0
> a1.sinks.k3.hdfs.rollSize = 500000000
> a1.sinks.k3.hdfs.rollInterval = 300
> a1.sinks.k3.hdfs.path = s3n://${path}
>
> #SINK (S3)
> a1.sinks.k4.type = hdfs
> a1.sinks.k4.hdfs.filePrefix = packet
> a1.sinks.k4.hdfs.batchSize= 1
> a1.sinks.k4.hdfs.fileSuffix = .snappy
> a1.sinks.k4.hdfs.codeC = snappy
> a1.sinks.k4.hdfs.fileType = CompressedStream
> a1.sinks.k4.hdfs.rollCount = 0
> a1.sinks.k4.hdfs.rollSize = 500000000
> a1.sinks.k4.hdfs.rollInterval = 300
> a1.sinks.k4.hdfs.path = s3n://${path}
>
> a1.sinkgroups.g1.sinks = k1 k2
> a1.sinkgroups.g1.processor.type = load_balance
> a1.sinkgroups.g1.processor.backoff = true
> a1.sinkgroups.g1.processor.selector = random
>
> a1.sinkgroups.g2.sinks = k3 k4
> a1.sinkgroups.g2.processor.type = load_balance
> a1.sinkgroups.g2.processor.backoff = true
> a1.sinkgroups.g2.processor.selector = random
>
> # INTERCEPTORS (TIMESTAMP FOR HDFS PATH)
> a1.sources.r1.interceptors = i1 i2
> a1.sources.r1.interceptors.i1.type = timestamp
> a1.sources.r1.interceptors.i2.type = host
> a1.sources.r1.interceptors.i2.preserveExisting = false
>
> # CHANNEL (MEM)
> a1.channels.c1.type = memory
> a1.channels.c1.capacity = 500
> a1.channels.c1.transactionCapacity = 1
> #a1.channels.c1.byteCapacity = 3000000000
>
> # CHANNEL (MEM)
> a1.channels.c2.type = memory
> a1.channels.c2.capacity = 500
> a1.channels.c2.transactionCapacity = 1
> #a1.channels.c2.byteCapacity = 3000000000
>
> ## bind the source and sink to the channel
> a1.sources.r1.channels = c1 c2
> a1.sources.r1.selector.type = replicating
>
> a1.sinks.k1.channel = c1
> a1.sinks.k2.channel = c1
>
> a1.sinks.k3.channel = c2
> a1.sinks.k4.channel = c2
>
> --
> *Thanks!*
>
--
---------------------------------------------------------------------
This e-mail and any attachment is for authorised use by the intended
recipient(s) only. This email contains confidential information. It should
not be copied, disclosed to, retained or used by, any party other than the
intended recipient. Any unauthorised distribution, dissemination or copying
of this E-mail or its attachments, and/or any use of any information
contained in them, is strictly prohibited and may be illegal. If you are
not an intended recipient then please promptly delete this e-mail and any
attachment and all copies and inform the sender directly via email. Any
emails that you send to us may be monitored by systems or persons other
than the named communicant for the purposes of ascertaining whether the
communication complies with the law and company policies.