[ 
https://issues.apache.org/jira/browse/FLUME-3305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rumit Patel resolved FLUME-3305.
--------------------------------
    Resolution: Abandoned

> Working data pipeline using flume
> ---------------------------------
>
>                 Key: FLUME-3305
>                 URL: https://issues.apache.org/jira/browse/FLUME-3305
>             Project: Flume
>          Issue Type: Question
>            Reporter: Rumit Patel
>            Priority: Major
>
> I'm trying to introduce flume to my current workplace and running in to some 
> issues. I would like to get help/direction on my approach to the problem so I 
> can work through it rather than banging my head on it.
> I'm consuming KafkaAvroSerialized messages from KafkaTopic and need to drain 
> them on HDFS based on eventTime defined in the message. 
> My approach is to have, (Kafka source -> memory channel -> HDFS sink).
>  
> {code:java}
> # Name the components on this agent
> a1.sources = r1
> a1.sinks = k1
> a1.channels = c1
> # Describe/configure the source
> a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
> a1.sources.r1.channels = c1
> a1.sources.r1.batchSize = 500
> a1.sources.r1.batchDurationMillis = 1000
> a1.sources.r1.kafka.bootstrap.servers = localhost:9092
> a1.sources.r1.kafka.topics = conversationAction
> a1.sources.r1.kafka.consumer.group.id = flumeTest
> #a1.sources.r1.kafka.consumer.key.deserializer = 
> io.confluent.kafka.serializers.KafkaAvroDeserializer (Using this blows up on 
> kafkaSource line 226)
> #a1.sources.r1.kafka.consumer.value.deserializer = 
> io.confluent.kafka.serializers.KafkaAvroDeserializer
> #a1.sources.r1.kafka.consumer.schema.registry.url = http://localhost:8081
> #a1.sources.r1.useFlumeEventFormat = true
> #a1.sources.r1.kafka.consumer.specific.avro.reader = true
> # define data interceptor (so I can determine partition to use for dropping 
> the data)
> a1.sources.r1.interceptors = i1
> a1.sources.r1.interceptors.i1.type = 
> com.jetblack.avro.intercept.EventInterceptor$Builder
> a1.sources.r1.interceptors.i1.hitDateHeader = hitdate
> a1.sinks.k1.type = hdfs
> a1.sinks.k1.channel = c1
> a1.sinks.k1.hdfs.path = /tmp/flume/%{topic}/hitdate=%{hitdate}
> a1.sinks.k1.hdfs.fileType = DataStream
> a1.sinks.k1.hdfs.fileSuffix = .avro
> # Roll files in HDFS every 5 min or at 255MB; don't roll based on number 
> of records
> # We roll at 255MB because our block size is 128MB, we want 2 full blocks 
> without going over
> a1.sinks.k1.hdfs.rollInterval = 60
> a1.sinks.k1.hdfs.rollSize = 267386880
> a1.sinks.k1.hdfs.rollCount = 0
> # Write to HDFS file in batches of 20 records
> a1.sinks.k1.hdfs.batchSize = 20
> a1.sinks.k1.serializer = 
> org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
> a1.sinks.k1.serializer.schemaURL = file:///actions.avsc
> # following property gives generic avro event vs. we need custom avro event 
> so use above 2 properties
> #a1.sinks.k1.serializer = avro_event
> a1.sinks.k1.serializer.compressionCodec = snappy
> # Use a channel which buffers events in memory
> a1.channels.c1.type = memory
> a1.channels.c1.capacity = 1000
> a1.channels.c1.transactionCapacity = 1000
> # Bind the source and sink to the channel
> a1.sources.r1.channels = c1
> a1.sinks.k1.channel = c1
> {code}
>  
>  
> End users need to be able to access the data using hive queries. So Hive 
> table is created using following syntax. 
>  
> {code:java}
> CREATE EXTERNAL TABLE IF NOT EXISTS conversationactiontest
> PARTITIONED BY (hitdate INT)
> ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
> STORED AS INPUTFORMAT 
> 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
> OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
> LOCATION 'file:///tmp/flume/actionsdir'
> TBLPROPERTIES 
> ('avro.schema.url'='file:///src/main/resources/avro/actions.avsc');
> {code}
>  
> It seems to be able to create data files however noticing 2 issues,
>  # When I add header for interceptors it results in messy avro files which 
> are not preferred.
>  # Queries result in following error which is very annoying to me.
>  
> {code:java}
> Failed with exception 
> java.io.IOException:org.apache.avro.AvroRuntimeException: Malformed data. 
> Length is negative: -1
> {code}
>  
> I've tried few options but no luck yet getting to the root cause. Any help 
> would be highly appreciated.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to