Santosh Balasubramanya created FLUME-3047:
---------------------------------------------
Summary: Avro Sink HDFS with
org.apache.flume.sink.hdfs.AvroEventSerializer$Builder not working
Key: FLUME-3047
URL: https://issues.apache.org/jira/browse/FLUME-3047
Project: Flume
Issue Type: Bug
Components: Client SDK, Sinks+Sources
Affects Versions: v1.7.0
Reporter: Santosh Balasubramanya
Priority: Blocker
For the below configuration, avro messages for Kafka topic are pulled and
written into HDFS succesfully. But when trying to deserialize using
AvroTools.jar (java -jar avro-tools-1.8.1.jar tojson FlumeData.1484909338012 >
flume) gives an exception.
Please find below Flume conf and avro related files
agent1.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
agent1.sources.kafka-source.zookeeperConnect = machinemae:2181
agent1.sources.kafka-source.topic = unverified
agent1.sources.kafka-source.groupId = flume
agent1.sources.kafka-source.channels = memory-channel
agent1.sources.kafka-source.interceptors = i1
agent1.sources.kafka-source.interceptors.i1.type = timestamp
agent1.sources.kafka-source.kafka.consumer.timeout.ms = 100
#agent1.sources.kafka-source.useFlumeEventFormat = true
agent1.channels.memory-channel.type = memory
agent1.channels.memory-channel.capacity = 10000
agent1.channels.memory-channel.transactionCapacity = 1000
agent1.sinks.hdfs-sink.type = hdfs
agent1.sinks.hdfs-sink.hdfs.fileSuffix=.avro
agent1.sinks.hdfs-sink.hdfs.path =
/company/jar/source/gu33/s4/1.35/%{topic}/%y-%m-%d
agent1.sinks.hdfs-sink.hdfs.rollInterval = 5
agent1.sinks.hdfs-sink.hdfs.rollSize = 0
agent1.sinks.hdfs-sink.hdfs.rollCount = 0
agent1.sinks.hdfs-sink.hdfs.fileType = DataStream
#agent1.sinks.hdfs-sink.hdfs.writeFormat = Text
agent1.sinks.hdfs-sink.channel = memory-channel
#agent1.sinks.hdfs-sink.serializer = avro_event
agent1.sinks.hdfs-sink.serializer.compressionCodec = snappy
agent1.sinks.hdfs-sink.serializer=org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
agent1.sinks.hdfs-sink.serializer.schemaURL =
hdfs://machinemane:9000/ca/gu33.avsc
agent1.sources = kafka-source
agent1.channels = memory-channel
agent1.sinks = hdfs-sink
############################
with below avro messages and schema
Avro Schema :
{
"type" : "record",
"name" : “xmenHeader",
"namespace" : "com.company.xmen”,
"fields" : [ {
"name" : "header",
"type" : {
"type" : "record",
"name" : "header",
"fields" : [ {
"name" : "tenant_id",
"type" : [ "null", "string" ],
"default" : "null"
}, {
"name" : "doc_type_id",
"type" : [ "null", "string" ],
"default" : "null"
}, {
"name" : "unique_id",
"type" : [ "null", "string" ],
"default" : "null"
}, {
"name" : "doc_type_version",
"type" : [ "null", "string" ],
"default" : "null"
}, {
"name" : "product_id",
"type" : [ "null", "string" ],
"default" : "null"
} ]
}
}, {
"name" : "body",
"type" : {
"type" : "record",
"name" : "body",
"fields" : [ {
"name" : "name",
"type" : [ "null", {
"type" : "record",
"name" : "name_name_0",
"fields" : [ {
"name" : "app_id",
"type" : [ "null", "string" ],
"default" : "null"
} ]
} ],
"default" : "null"
} ]
}
} ]
}
Actual JSON message:
{
"header": {
"product_id": "GU33",
"tenant_id": "tenant_name",
"doc_type_id": "s4",
"doc_type_version": "1.35"
},
"body":
{"name" : {"app_id":"testApp_ID"}}
}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)