Folks,

I was follow this link Hadoop at Twitter (part 1): Splittable LZO Compression 
to integration LZO in Hadoop2.0, but seems Flume-ng lzo compress not work.

My flume-ng configuratioin file is:

cat > /tmp/flume-lzo.conf <<EOF
agent.sources = lzo-avro-collect
agent.channels = lzo-memory-channel
agent.sinks = lzo-hdfs-write

agent.sources.lzo-avro-collect.type = avro
agent.sources.lzo-avro-collect.bind = 0.0.0.0
agent.sources.lzo-avro-collect.port = 12345
agent.sources.lzo-avro-collect.channels = lzo-memory-channel
agent.channels.lzo-memory-channel.type = memory
agent.channels.lzo-memory-channel.capacity = 1000000
agent.channels.lzo-memory-channel.transactionCapacity = 10000
agent.channels.lzo-memory-channel.stay-alive = 3
agent.sinks.lzo-hdfs-write.type = hdfs
agent.sinks.lzo-hdfs-write.hdfs.path = hdfs://10.34.4.55:8020/tmp/
agent.sinks.lzo-hdfs-write.hdfs.filePrefix = test%Y
agent.sinks.lzo-hdfs-write.channel = lzo-memory-channel
agent.sinks.lzo-hdfs-write.hdfs.rollInterval = 3600
agent.sinks.lzo-hdfs-write.hdfs.rollSize = 209715200
agent.sinks.lzo-hdfs-write.hdfs.rollCount = 0
agent.sinks.lzo-hdfs-write.hdfs.batchSize = 1000
agent.sinks.lzo-hdfs-write.hdfs.codeC = lzo
agent.sinks.lzo-hdfs-write.hdfs.fileType = CompressedStream
EOF
and i start flume-ng-agent on front

sudo -u flume flume-ng agent -n agent -f /tmp/flume-lzo.conf
using avro-client to ship the event.

echo aaaaaaaaaaaaaaaaa > /tmp/events
sudo -u flume flume-ng avro-client -H localhost -p 12345 -F /tmp/events
the flume-ng-agent collector log as follow:

12/08/28 06:33:53 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library
12/08/28 06:33:53 INFO lzo.LzoCodec: Successfully loaded & initialized 
native-lzo library [hadoop-lzo rev 6bb1b7f8b9044d8df9b4d2b6641db7658aab3cf8]
12/08/28 06:33:54 INFO hdfs.HDFSEventSink: Hadoop Security enabled: false
12/08/28 06:33:54 INFO nodemanager.DefaultLogicalNodeManager: Starting new 
configuration:{ sourceRunners:{lzo-avro-collect=EventDrivenSourceRunner: { 
source:AvroSource: { bindAddress:0.0.0.0 port:12345 } }} 
sinkRunners:{lzo-hdfs-write=SinkRunner: { 
policy:org.apache.flume.sink.DefaultSinkProcessor@39e57e8f counterGroup:{ 
name:null counters:{} } }} 
channels:{lzo-memory-channel=org.apache.flume.channel.MemoryChannel@9d7fbfb} }
12/08/28 06:33:54 INFO nodemanager.DefaultLogicalNodeManager: Starting Channel 
lzo-memory-channel
12/08/28 06:33:54 INFO nodemanager.DefaultLogicalNodeManager: Starting Sink 
lzo-hdfs-write
12/08/28 06:33:54 INFO nodemanager.DefaultLogicalNodeManager: Starting Source 
lzo-avro-collect
12/08/28 06:33:54 INFO source.AvroSource: Avro source starting:AvroSource: { 
bindAddress:0.0.0.0 port:12345 }
12/08/28 06:34:02 INFO ipc.NettyServer: [id: 0x651db6bb, /127.0.0.1:48085 => 
/127.0.0.1:12345] OPEN
12/08/28 06:34:02 INFO ipc.NettyServer: [id: 0x651db6bb, /127.0.0.1:48085 => 
/127.0.0.1:12345] BOUND: /127.0.0.1:12345
12/08/28 06:34:02 INFO ipc.NettyServer: [id: 0x651db6bb, /127.0.0.1:48085 => 
/127.0.0.1:12345] CONNECTED: /127.0.0.1:48085
12/08/28 06:34:02 INFO ipc.NettyServer: [id: 0x651db6bb, /127.0.0.1:48085 :> 
/127.0.0.1:12345] DISCONNECTED
12/08/28 06:34:02 INFO ipc.NettyServer: [id: 0x651db6bb, /127.0.0.1:48085 :> 
/127.0.0.1:12345] UNBOUND
12/08/28 06:34:02 INFO ipc.NettyServer: [id: 0x651db6bb, /127.0.0.1:48085 :> 
/127.0.0.1:12345] CLOSED
12/08/28 06:34:03 INFO hdfs.BucketWriter: Creating 
hdfs://10.34.4.55:8020/tmp//test.1346135643045.lzo_deflate.tmp
12/08/28 06:34:04 WARN conf.Configuration: hadoop.native.lib is deprecated. 
Instead, use io.native.lib.available
^C12/08/28 06:34:26 INFO node.FlumeNode: Flume node stopping - agent
12/08/28 06:34:26 INFO lifecycle.LifecycleSupervisor: Stopping lifecycle 
supervisor 8
12/08/28 06:34:26 INFO nodemanager.DefaultLogicalNodeManager: Node manager 
stopping
12/08/28 06:34:26 INFO lifecycle.LifecycleSupervisor: Stopping lifecycle 
supervisor 8
12/08/28 06:34:26 INFO source.AvroSource: Avro source stopping:AvroSource: { 
bindAddress:0.0.0.0 port:12345 }
12/08/28 06:34:26 INFO hdfs.HDFSEventSink: Closing 
hdfs://10.34.4.55:8020/tmp//test
12/08/28 06:34:26 INFO hdfs.BucketWriter: Renaming 
hdfs://10.34.4.55:8020/tmp/test.1346135643045.lzo_deflate.tmp to 
hdfs://10.34.4.55:8020/tmp/test.1346135643045.lzo_deflate
12/08/28 06:34:26 INFO properties.PropertiesFileConfigurationProvider: 
Configuration provider stopping
When i shutdown the colloector, agent generated a 
“”/tmp/test.1346135643045.lzo_deflate” file”, it seems ok, but i can’t 
uncompress it. The command as follow:

[root@ip-10-34-4-55 tmp]# sudo -u hdfs hadoop fs -ls /tmp/test*
Found 1 items
-rw-r--r--   3 flume supergroup         30 2012-08-28 06:34 
/tmp/test.1346135643045.lzo_deflate
[root@ip-10-34-4-55 tmp]# sudo -u hdfs hadoop fs -get 
/tmp/test.1346135643045.lzo_deflate ./
12/08/28 06:36:26 INFO util.NativeCodeLoader: Loaded the native-hadoop library
[root@ip-10-34-4-55 tmp]# lzop -d test.1346135643045.lzo_deflate
lzop: test.1346135643045.lzo_deflate: not a lzop file
And i verify the flume-ng generated lzo file using hadoop, also not work, the 
command and exception as follow.

[root@ip-10-34-4-55 tmp]# sudo -u hdfs hadoop fs -cp 
/tmp/test.1346135643045.lzo_deflate /tmp/test.1346135643045.lzo
export 
JAVA_LIBRARY_PATH=/usr/lib/hadoop/lib/native:/usr/lib/hadoop/lib/native/Linux-amd64-64
hadoop jar /usr/lib/hadoop/lib/hadoop-lzo-0.4.15.jar 
com.hadoop.compression.lzo.LzoIndexer /tmp/test.1346135643045.lzo

[root@ip-10-34-4-55 tmp]# hadoop jar /usr/lib/hadoop/lib/hadoop-lzo-0.4.15.jar 
com.hadoop.compression.lzo.LzoIndexer /tmp/test.1346135643045.lzo

12/08/28 06:40:14 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library
12/08/28 06:40:14 INFO lzo.LzoCodec: Successfully loaded & initialized 
native-lzo library [hadoop-lzo rev 6bb1b7f8b9044d8df9b4d2b6641db7658aab3cf8]
12/08/28 06:40:15 INFO lzo.LzoIndexer: [INDEX] LZO Indexing file 
/tmp/test.1346135643045.lzo, size 0.00 GB...
12/08/28 06:40:15 INFO util.NativeCodeLoader: Loaded the native-hadoop library
12/08/28 06:40:15 WARN snappy.LoadSnappy: Snappy native library is available
12/08/28 06:40:15 INFO snappy.LoadSnappy: Snappy native library loaded
12/08/28 06:40:15 WARN conf.Configuration: hadoop.native.lib is deprecated. 
Instead, use io.native.lib.available
12/08/28 06:40:15 ERROR lzo.LzoIndexer: Error indexing 
/tmp/test.1346135643045.lzo
java.io.IOException: Invalid LZO header
    at 
com.hadoop.compression.lzo.LzopInputStream.readHeader(LzopInputStream.java:116)
    at 
com.hadoop.compression.lzo.LzopInputStream.<init>(LzopInputStream.java:54)
    at com.hadoop.compression.lzo.LzopCodec.createInputStream(LzopCodec.java:83)
    at com.hadoop.compression.lzo.LzoIndex.createIndex(LzoIndex.java:231)
    at 
com.hadoop.compression.lzo.LzoIndexer.indexSingleFile(LzoIndexer.java:117)
    at com.hadoop.compression.lzo.LzoIndexer.indexInternal(LzoIndexer.java:98)
    at com.hadoop.compression.lzo.LzoIndexer.index(LzoIndexer.java:52)
    at com.hadoop.compression.lzo.LzoIndexer.main(LzoIndexer.java:137)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:208)
[root@ip-10-34-4-55 tmp]#
Thanks, 
- Kevin

Reply via email to