Hi Yun,

thanks for your answer. I’m not very familiar with parquet and I expected it to 
be as easy as by using the Table-API of Flink. But obviously not 😊

Now the error is gone, but why do you choose this specific version? Can it be a 
newer one or older one?

And now I got following error:

org.apache.parquet.schema.InvalidSchemaException: Cannot write a schema with an 
empty group: required group meldedatum {
}
    at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:27)
    at org.apache.parquet.schema.GroupType.accept(GroupType.java:255)
    at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:31)
    at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:37)
    at org.apache.parquet.schema.MessageType.accept(MessageType.java:55)
    at 
org.apache.parquet.schema.TypeUtil.checkValidWriteSchema(TypeUtil.java:23)
    at 
org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:233)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
    at 
org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:530)
    at 
org.apache.flink.formats.parquet.avro.ParquetAvroWriters.createAvroParquetWriter(ParquetAvroWriters.java:87)
    at 
org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73)
    at 
org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57)
    at 
org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:69)
    at 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:83)
    at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209)
    at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200)
    at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
    at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
    at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:420)
    at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
    at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
    at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
    at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
    at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
    at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
    at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
    at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
    at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)


Do I have to write a Avro schema? I thought I can use normal POJOs from my 
datastream to specify the parquet format.


Best,
Jan

Von: Yun Gao <[email protected]>
Gesendet: Donnerstag, 14. Januar 2021 08:34
An: Jan Oelschlegel <[email protected]>; [email protected]
Betreff: Re: StreamingFileSink with ParquetAvroWriters

Hi Jan,

    Could you have a try by adding this dependency ?


<dependency>
   <groupId>org.apache.parquet</groupId>
   <artifactId>parquet-avro</artifactId>

   <version>1.11.1</version>

</dependency>





Best,
 Yun

------------------Original Mail ------------------
Sender:Jan Oelschlegel 
<[email protected]<mailto:[email protected]>>
Send Date:Thu Jan 14 00:49:30 2021
Recipients:[email protected] 
<[email protected]<mailto:[email protected]>>
Subject:StreamingFileSink with ParquetAvroWriters
Hi,

i’m using Flink (v.1.11.2) and would like to use the StreamingFileSink for 
writing into HDFS in Parquet format.

As it says in the documentation I have added the dependencies:

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-parquet_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
</dependency>

And this is my file sink definition:


val sink: StreamingFileSink[Event] = StreamingFileSink
  .forBulkFormat(
    new Path("hdfs://namenode.local:8020/user/datastream/"),
    ParquetAvroWriters.forReflectRecord(classOf[Event])
  )
  .build()


If I execute this in cluster I get the following error:

java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter
    at 
org.apache.flink.formats.parquet.avro.ParquetAvroWriters.createAvroParquetWriter(ParquetAvroWriters.java:84)
    at 
org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73)
    at 
org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57)
    at 
org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:69)
    at 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:83)
    at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209)
    at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200)
    at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
    at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
    at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:420)
    at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
    at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
    at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
    at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
    at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
    at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
    at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
    at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
    at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)


Looks like there are some dependencies missing. How can I fix this?


Jan O.

HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten 
bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten 
zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, 
bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen 
Telefonnummer.

Reply via email to