I have defined a streaming file sink for parquet to store my scala case
class.

StreamingFileSink

  .*forBulkFormat(*

    new Path*(*appArgs.datalakeBucket*)*,

    ParquetAvroWriters

      .*forReflectRecord(classOf[*Log*])*

*  )*

  .withBucketAssigner*(*new TransactionLogHiveBucketAssigner*())*

  .build*()*


where my class class is

Log(

   level: String,

    time_stamp: Option[Long] = None

)


When Flink tries to write a specific instance to parquet


Log("info",Some(1596975950000))


it throws the following error:


org.apache.parquet.schema.InvalidSchemaException: Cannot write a schema
with an empty group: required group time_stamp {
}
    at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:27)
    at org.apache.parquet.schema.GroupType.accept(GroupType.java:255)
    at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:31)
    at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:37)
    at org.apache.parquet.schema.MessageType.accept(MessageType.java:55)
    at org.apache.parquet.schema.TypeUtil.checkValidWriteSchema(TypeUtil
.java:23)
    at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter
.java:280)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:283
)
    at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter
.java:564)
    at org.apache.flink.formats.parquet.avro.ParquetAvroWriters
.createAvroParquetWriter(ParquetAvroWriters.java:87)
    at org.apache.flink.formats.parquet.avro.ParquetAvroWriters
.lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73)
    at org.apache.flink.formats.parquet.ParquetWriterFactory.create(
ParquetWriterFactory.java:57)
    at org.apache.flink.streaming.api.functions.sink.filesystem.
BulkPartWriter$Factory.openNew(BulkPartWriter.java:103)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket
.rollPartFile(Bucket.java:222)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket
.write(Bucket.java:212)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets
.onElement(Buckets.java:274)
    at org.apache.flink.streaming.api.functions.sink.filesystem.
StreamingFileSink.invoke(StreamingFileSink.java:445)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(
StreamSink.java:56)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
    at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
730)
    at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
708)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(
StreamMap.java:41)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:711)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:664)
    at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
730)
    at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
708)
    at org.apache.flink.streaming.api.operators.TimestampedCollector.collect
(TimestampedCollector.java:53)
    at org.apache.flink.streaming.api.functions.windowing.
PassThroughWindowFunction.apply(PassThroughWindowFunction.java:36)
    at org.apache.flink.streaming.runtime.operators.windowing.functions.
InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction
.java:46)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
.emitWindowContents(WindowOperator.java:549)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
.processElement(WindowOperator.java:373)
    at org.apache.flink.streaming.runtime.tasks.
OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
.java:173)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.processElement(StreamTaskNetworkInput.java:151)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:128)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:69)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:311)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:187)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:487)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:470)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:748)

Can Flink Parquet not handle field of type Option?

Thanks,

Reply via email to