I have defined a streaming file sink for parquet to store my scala case class.
StreamingFileSink .*forBulkFormat(* new Path*(*appArgs.datalakeBucket*)*, ParquetAvroWriters .*forReflectRecord(classOf[*Log*])* * )* .withBucketAssigner*(*new TransactionLogHiveBucketAssigner*())* .build*()* where my class class is Log( level: String, time_stamp: Option[Long] = None ) When Flink tries to write a specific instance to parquet Log("info",Some(1596975950000)) it throws the following error: org.apache.parquet.schema.InvalidSchemaException: Cannot write a schema with an empty group: required group time_stamp { } at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:27) at org.apache.parquet.schema.GroupType.accept(GroupType.java:255) at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:31) at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:37) at org.apache.parquet.schema.MessageType.accept(MessageType.java:55) at org.apache.parquet.schema.TypeUtil.checkValidWriteSchema(TypeUtil .java:23) at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter .java:280) at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:283 ) at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter .java:564) at org.apache.flink.formats.parquet.avro.ParquetAvroWriters .createAvroParquetWriter(ParquetAvroWriters.java:87) at org.apache.flink.formats.parquet.avro.ParquetAvroWriters .lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73) at org.apache.flink.formats.parquet.ParquetWriterFactory.create( ParquetWriterFactory.java:57) at org.apache.flink.streaming.api.functions.sink.filesystem. BulkPartWriter$Factory.openNew(BulkPartWriter.java:103) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket .rollPartFile(Bucket.java:222) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket .write(Bucket.java:212) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets .onElement(Buckets.java:274) at org.apache.flink.streaming.api.functions.sink.filesystem. StreamingFileSink.invoke(StreamingFileSink.java:445) at org.apache.flink.streaming.api.operators.StreamSink.processElement( StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks. OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks. OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks. OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators. AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java: 730) at org.apache.flink.streaming.api.operators. AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java: 708) at org.apache.flink.streaming.api.operators.StreamMap.processElement( StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks. OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks. OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks. OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.runtime.tasks. OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:711) at org.apache.flink.streaming.runtime.tasks. OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:664) at org.apache.flink.streaming.api.operators. AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java: 730) at org.apache.flink.streaming.api.operators. AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java: 708) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect (TimestampedCollector.java:53) at org.apache.flink.streaming.api.functions.windowing. PassThroughWindowFunction.apply(PassThroughWindowFunction.java:36) at org.apache.flink.streaming.runtime.operators.windowing.functions. InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction .java:46) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator .emitWindowContents(WindowOperator.java:549) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator .processElement(WindowOperator.java:373) at org.apache.flink.streaming.runtime.tasks. OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask .java:173) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput .processElement(StreamTaskNetworkInput.java:151) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput .emitNext(StreamTaskNetworkInput.java:128) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor .processInput(StreamOneInputProcessor.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( StreamTask.java:311) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor .runMailboxLoop(MailboxProcessor.java:187) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop( StreamTask.java:487) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask .java:470) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) Can Flink Parquet not handle field of type Option? Thanks,