Hi Till,

Thanks for the reply .
I have doubt that input has problem because :

1. if input has some problem than it should not come in the topic itself as
schema validation fail at producer side only.
2.  i am using the same schema that was used to writed the record in topic
and i am able to parse the record with same schema as when i try to print
the stream its not giving any error , only problem occurring when writing
as parquet.

This is the code that i am using to get the schema that i m passing to
parquetwriter.

public static Schema getSchema(String subjectName) {
    try {
        List<Integer> versions = registryClient.getAllVersions(subjectName);
        SchemaMetadata schemaMeta =
registryClient.getSchemaMetadata(subjectName,
versions.get(versions.size() - 1));
        Schema schema = new Schema.Parser().parse(schemaMeta.getSchema());
        return schema;
    } catch (Exception e) {
        e.printStackTrace();
        return null;
    }
}


How input can pass through and inserted in topic if it has some issue. Even
if its occusring how to find those record and skip that so that because of
one record my whole processing should not fail.

Thanks,
Anuj





On Sat, Feb 29, 2020 at 9:12 PM Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Anuj,
>
> it looks to me that your input GenericRecords don't conform with your
> output schema schemaSubject. At least, the stack trace says that your
> output schema expects some String field but the field was actually some
> ArrayList. Consequently, I would suggest to verify that your input data has
> the right format and if not to filter those records out which are
> non-conformant.
>
> Cheers,
> Till
>
> On Sat, Feb 29, 2020 at 2:13 PM aj <ajainje...@gmail.com> wrote:
>
>> Hi All,
>>
>> i have Written a consumer that read from kafka topic and write the data
>> in parquet format using StreamSink . But i am getting following error. Its
>> runs for some hours than start failing with this excpetions. I tried to
>> restart it but failing with same exceptions.After i restart with latest
>> offset it started working fine for soem hours and than again fail. I am not
>> able to find root cause for this issue.
>>
>> java.lang.Exception: 
>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>  Could not forward element to next operator
>>     at 
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>     at java.lang.Thread.run(Thread.java:748)Caused by: 
>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>  Could not forward element to next operator
>>     at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:651)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>     at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>     at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>     at 
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>>     at 
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>>     at 
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>>     at 
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>>     at 
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>>     at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>>     at 
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>     at 
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)Caused
>>  by: java.lang.ClassCastException: java.util.ArrayList cannot be cast to 
>> java.lang.CharSequence
>>     at 
>> org.apache.parquet.avro.AvroWriteSupport.fromAvroString(AvroWriteSupport.java:371)
>>     at 
>> org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:346)
>>     at 
>> org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:278)
>>     at 
>> org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191)
>>     at 
>> org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)
>>     at 
>> org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
>>     at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)
>>     at 
>> org.apache.flink.formats.parquet.ParquetBulkWriter.addElement(ParquetBulkWriter.java:52)
>>     at 
>> org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:50)
>>     at 
>> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:214)
>>     at 
>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268)
>>     at 
>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370)
>>     at 
>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>
>>
>> *code  :*
>>
>>
>> *DataStream<GenericRecord> sourceStream =
>> env.addSource(kafkaConsumer010);*
>>
>> *
>> final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
>>                     (path, 
>> ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(schemaSubject)))
>>                     .withBucketAssigner(new EventTimeBucketAssigner())
>>                     .build();
>>
>> sourceStream.addSink(sink).setParallelism(parallelism);*
>>
>> I need to undetstand why its ran for few hours than start failing. Please 
>> help me to understand this.
>>
>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Reply via email to