[
https://issues.apache.org/jira/browse/FLUME-3305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rumit Patel resolved FLUME-3305.
--------------------------------
Resolution: Abandoned
> Working data pipeline using flume
> ---------------------------------
>
> Key: FLUME-3305
> URL: https://issues.apache.org/jira/browse/FLUME-3305
> Project: Flume
> Issue Type: Question
> Reporter: Rumit Patel
> Priority: Major
>
> I'm trying to introduce flume to my current workplace and running in to some
> issues. I would like to get help/direction on my approach to the problem so I
> can work through it rather than banging my head on it.
> I'm consuming KafkaAvroSerialized messages from KafkaTopic and need to drain
> them on HDFS based on eventTime defined in the message.
> My approach is to have, (Kafka source -> memory channel -> HDFS sink).
>
> {code:java}
> # Name the components on this agent
> a1.sources = r1
> a1.sinks = k1
> a1.channels = c1
> # Describe/configure the source
> a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
> a1.sources.r1.channels = c1
> a1.sources.r1.batchSize = 500
> a1.sources.r1.batchDurationMillis = 1000
> a1.sources.r1.kafka.bootstrap.servers = localhost:9092
> a1.sources.r1.kafka.topics = conversationAction
> a1.sources.r1.kafka.consumer.group.id = flumeTest
> #a1.sources.r1.kafka.consumer.key.deserializer =
> io.confluent.kafka.serializers.KafkaAvroDeserializer (Using this blows up on
> kafkaSource line 226)
> #a1.sources.r1.kafka.consumer.value.deserializer =
> io.confluent.kafka.serializers.KafkaAvroDeserializer
> #a1.sources.r1.kafka.consumer.schema.registry.url = http://localhost:8081
> #a1.sources.r1.useFlumeEventFormat = true
> #a1.sources.r1.kafka.consumer.specific.avro.reader = true
> # define data interceptor (so I can determine partition to use for dropping
> the data)
> a1.sources.r1.interceptors = i1
> a1.sources.r1.interceptors.i1.type =
> com.jetblack.avro.intercept.EventInterceptor$Builder
> a1.sources.r1.interceptors.i1.hitDateHeader = hitdate
> a1.sinks.k1.type = hdfs
> a1.sinks.k1.channel = c1
> a1.sinks.k1.hdfs.path = /tmp/flume/%{topic}/hitdate=%{hitdate}
> a1.sinks.k1.hdfs.fileType = DataStream
> a1.sinks.k1.hdfs.fileSuffix = .avro
> # Roll files in HDFS every 5 min or at 255MB; don't roll based on number
> of records
> # We roll at 255MB because our block size is 128MB, we want 2 full blocks
> without going over
> a1.sinks.k1.hdfs.rollInterval = 60
> a1.sinks.k1.hdfs.rollSize = 267386880
> a1.sinks.k1.hdfs.rollCount = 0
> # Write to HDFS file in batches of 20 records
> a1.sinks.k1.hdfs.batchSize = 20
> a1.sinks.k1.serializer =
> org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
> a1.sinks.k1.serializer.schemaURL = file:///actions.avsc
> # following property gives generic avro event vs. we need custom avro event
> so use above 2 properties
> #a1.sinks.k1.serializer = avro_event
> a1.sinks.k1.serializer.compressionCodec = snappy
> # Use a channel which buffers events in memory
> a1.channels.c1.type = memory
> a1.channels.c1.capacity = 1000
> a1.channels.c1.transactionCapacity = 1000
> # Bind the source and sink to the channel
> a1.sources.r1.channels = c1
> a1.sinks.k1.channel = c1
> {code}
>
>
> End users need to be able to access the data using hive queries. So Hive
> table is created using following syntax.
>
> {code:java}
> CREATE EXTERNAL TABLE IF NOT EXISTS conversationactiontest
> PARTITIONED BY (hitdate INT)
> ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
> STORED AS INPUTFORMAT
> 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
> OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
> LOCATION 'file:///tmp/flume/actionsdir'
> TBLPROPERTIES
> ('avro.schema.url'='file:///src/main/resources/avro/actions.avsc');
> {code}
>
> It seems to be able to create data files however noticing 2 issues,
> # When I add header for interceptors it results in messy avro files which
> are not preferred.
> # Queries result in following error which is very annoying to me.
>
> {code:java}
> Failed with exception
> java.io.IOException:org.apache.avro.AvroRuntimeException: Malformed data.
> Length is negative: -1
> {code}
>
> I've tried few options but no luck yet getting to the root cause. Any help
> would be highly appreciated.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]