[
https://issues.apache.org/jira/browse/FLUME-3138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ramgopal N updated FLUME-3138:
------------------------------
Issue Type: Bug (was: Question)
> SchemaURL from flume configuration is dropping the flume events expecting the
> schema url to be added in event header as against FLUME-2810
> ------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLUME-3138
> URL: https://issues.apache.org/jira/browse/FLUME-3138
> Project: Flume
> Issue Type: Bug
> Components: Configuration
> Affects Versions: 1.7.0
> Environment: Flume1.7
> Reporter: Ramgopal N
>
> I have avro data coming to kafka topic. Flume reads the events from kafka and
> then using kite dataset with hdfs sink is put into HDFS as parquet data.
> Flume config is as below:
> agent.sinks.k1.channel = c1
> agent.sinks.k1.type = org.apache.flume.sink.kite.DatasetSink
> agent.sinks.k1.kite.dataset.uri =
> dataset:hdfs://namenodeHA/kite/avro_to_parquet_item2
> agent.sinks.k1.serializer =
> org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
> agent.sinks.k1.hdfs.filePrefix=parquetdata
> agent.sinks.k1.hdfs.fileSuffix = .parquet
> agent.sinks.k1.hdfs.fileType=DataStream
> #agent.sinks.k1.hdfs.rollInterval=30
> #agent.sinks.k1.hdfs.rollCount=1
> #agent.sinks.k1.hdfs.batchSize=1
> agent.sinks.k1.kite.batchSize=2
> agent.sinks.k1.kite.rollInterval=30
> agent.sinks.k1.kite.flushable.commitOnBatch=true
> #agent.sinks.k1.hdfs.path = hdfs://namenodeHA/user/flumetest
> #agent.sinks.k1.serializer.compressionCodec = snappy
> agent.sinks.k1.serializer.schemaURL = hdfs://namenodeHA/kite/item.avsc
> I am getting the below exception in the flume logs:
> 2017-07-31 06:18:40,796 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO
> - org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:153)]
> Got brand-new compressor [.snappy]
> 2017-07-31 06:18:40,802 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO
> -
> org.kitesdk.data.spi.filesystem.FileSystemWriter.initialize(FileSystemWriter.java:147)]
> Opened output appender
> ParquetAppender{path=hdfs://namenodeHA/kite/avro_to_parquet_item2/.6d1019b3-96c4-4334-b737-af260d17aac4.parquet.tmp,
>
> schema={"type":"record","name":"item","namespace":"item.avro","fields":[{"name":"i_item_sk","type":..................................{"name":"i_manager_id","type":["null","int"]},{"name":"i_product_name","type":["null","string"]}]},
> fileSystem=DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_2077692400_17,
> ugi=root (auth:SIMPLE)]],
> avroParquetWriter=parquet.avro.AvroParquetWriter@31ffba30} for
> hdfs://namenodeHA/kite/avro_to_parquet_item2/6d1019b3-96c4-4334-b737-af260d17aac4.parquet
> 2017-07-31 06:18:40,803 (SinkRunner-PollingRunner-DefaultSinkProcessor)
> [ERROR -
> org.apache.flume.sink.kite.policy.RetryPolicy.handle(RetryPolicy.java:39)]
> Event delivery failed: No schema in event headers. Headers must include
> either flume.avro.schema.url or flume.avro.schema.literal
> 2017-07-31 06:18:40,803 (SinkRunner-PollingRunner-DefaultSinkProcessor)
> [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)]
> Unable to deliver event. Exception follows.
> org.apache.flume.EventDeliveryException:
> org.apache.flume.sink.kite.NonRecoverableEventException: No schema in event
> headers. Headers must include either flume.avro.schema.url or
> flume.avro.schema.literal
> at
> org.apache.flume.sink.kite.policy.RetryPolicy.handle(RetryPolicy.java:42)
> at org.apache.flume.sink.kite.DatasetSink.write(DatasetSink.java:375)
> at org.apache.flume.sink.kite.DatasetSink.process(DatasetSink.java:301)
> at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flume.sink.kite.NonRecoverableEventException: No schema
> in event headers. Headers must include either flume.avro.schema.url or
> flume.avro.schema.literal
> at
> org.apache.flume.sink.kite.parser.AvroParser.schema(AvroParser.java:185)
> at
> org.apache.flume.sink.kite.parser.AvroParser.parse(AvroParser.java:155)
> at
> org.apache.flume.sink.kite.parser.AvroParser.parse(AvroParser.java:56)
> at org.apache.flume.sink.kite.DatasetSink.write(DatasetSink.java:366)
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)