Ramgopal N created FLUME-3138:
---------------------------------
Summary: SchemaURL from flume configuration is dropping the flume
events expecting the schema url to be added in event header as against
FLUME-2810
Key: FLUME-3138
URL: https://issues.apache.org/jira/browse/FLUME-3138
Project: Flume
Issue Type: Question
Components: Configuration
Affects Versions: 1.7.0
Environment: Flume1.7
Reporter: Ramgopal N
I have avro data coming to kafka topic. Flume reads the events from kafka and
then using kite dataset with hdfs sink is put into HDFS as parquet data.
Flume config is as below:
agent.sinks.k1.channel = c1
agent.sinks.k1.type = org.apache.flume.sink.kite.DatasetSink
agent.sinks.k1.kite.dataset.uri =
dataset:hdfs://namenodeHA/kite/avro_to_parquet_item2
agent.sinks.k1.serializer =
org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
agent.sinks.k1.hdfs.filePrefix=parquetdata
agent.sinks.k1.hdfs.fileSuffix = .parquet
agent.sinks.k1.hdfs.fileType=DataStream
#agent.sinks.k1.hdfs.rollInterval=30
#agent.sinks.k1.hdfs.rollCount=1
#agent.sinks.k1.hdfs.batchSize=1
agent.sinks.k1.kite.batchSize=2
agent.sinks.k1.kite.rollInterval=30
agent.sinks.k1.kite.flushable.commitOnBatch=true
#agent.sinks.k1.hdfs.path = hdfs://namenodeHA/user/flumetest
#agent.sinks.k1.serializer.compressionCodec = snappy
agent.sinks.k1.serializer.schemaURL = hdfs://namenodeHA/kite/item.avsc
I am getting the below exception in the flume logs:
2017-07-31 06:18:40,796 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO -
org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:153)] Got
brand-new compressor [.snappy]
2017-07-31 06:18:40,802 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO -
org.kitesdk.data.spi.filesystem.FileSystemWriter.initialize(FileSystemWriter.java:147)]
Opened output appender
ParquetAppender{path=hdfs://namenodeHA/kite/avro_to_parquet_item2/.6d1019b3-96c4-4334-b737-af260d17aac4.parquet.tmp,
schema={"type":"record","name":"item","namespace":"item.avro","fields":[{"name":"i_item_sk","type":..................................{"name":"i_manager_id","type":["null","int"]},{"name":"i_product_name","type":["null","string"]}]},
fileSystem=DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_2077692400_17,
ugi=root (auth:SIMPLE)]],
avroParquetWriter=parquet.avro.AvroParquetWriter@31ffba30} for
hdfs://namenodeHA/kite/avro_to_parquet_item2/6d1019b3-96c4-4334-b737-af260d17aac4.parquet
2017-07-31 06:18:40,803 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR
- org.apache.flume.sink.kite.policy.RetryPolicy.handle(RetryPolicy.java:39)]
Event delivery failed: No schema in event headers. Headers must include either
flume.avro.schema.url or flume.avro.schema.literal
2017-07-31 06:18:40,803 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR
- org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)] Unable to
deliver event. Exception follows.
org.apache.flume.EventDeliveryException:
org.apache.flume.sink.kite.NonRecoverableEventException: No schema in event
headers. Headers must include either flume.avro.schema.url or
flume.avro.schema.literal
at
org.apache.flume.sink.kite.policy.RetryPolicy.handle(RetryPolicy.java:42)
at org.apache.flume.sink.kite.DatasetSink.write(DatasetSink.java:375)
at org.apache.flume.sink.kite.DatasetSink.process(DatasetSink.java:301)
at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.sink.kite.NonRecoverableEventException: No schema
in event headers. Headers must include either flume.avro.schema.url or
flume.avro.schema.literal
at
org.apache.flume.sink.kite.parser.AvroParser.schema(AvroParser.java:185)
at
org.apache.flume.sink.kite.parser.AvroParser.parse(AvroParser.java:155)
at
org.apache.flume.sink.kite.parser.AvroParser.parse(AvroParser.java:56)
at org.apache.flume.sink.kite.DatasetSink.write(DatasetSink.java:366)
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)