Hi All,

i have Written a consumer that read from kafka topic and write the data in
parquet format using StreamSink . But i am getting following error. Its
runs for some hours than start failing with this excpetions. I tried to
restart it but failing with same exceptions.After i restart with latest
offset it started working fine for soem hours and than again fail. I am not
able to find root cause for this issue.

java.lang.Exception:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
    at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
    at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:651)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
    at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
    at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
    at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
    at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
    at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
    at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)Caused
by: java.lang.ClassCastException: java.util.ArrayList cannot be cast
to java.lang.CharSequence
    at 
org.apache.parquet.avro.AvroWriteSupport.fromAvroString(AvroWriteSupport.java:371)
    at 
org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:346)
    at 
org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:278)
    at 
org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191)
    at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)
    at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
    at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)
    at 
org.apache.flink.formats.parquet.ParquetBulkWriter.addElement(ParquetBulkWriter.java:52)
    at 
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:50)
    at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:214)
    at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268)
    at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370)
    at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)


*code  :*


*DataStream<GenericRecord> sourceStream = env.addSource(kafkaConsumer010);*

*
final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
                    (path,
ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(schemaSubject)))
                    .withBucketAssigner(new EventTimeBucketAssigner())
                    .build();

sourceStream.addSink(sink).setParallelism(parallelism);*

I need to undetstand why its ran for few hours than start failing.
Please help me to understand this.



-- 
Thanks & Regards,
Anuj Jain


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Reply via email to