Hi Jan

Could you make sure you are packaging that dependency with your job jar?
There are instructions how to configure your build setup[1]. Especially
the part how to build a jar with dependencies might come in handy[2].

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#appendix-template-for-building-a-jar-with-dependencies

On 13/01/2021 17:49, Jan Oelschlegel wrote:
>
> Hi,
>
>  
>
> i’m using Flink (v.1.11.2) and would like to use the StreamingFileSink
> for writing into HDFS in Parquet format.
>
>  
>
> As it says in the documentation I have added the dependencies:
>
>  
>
> <dependency>
>    <groupId>org.apache.flink</groupId>
>    <artifactId>flink-parquet_${scala.binary.version}</artifactId>
>    <version>${flink.version}</version>
> </dependency>
>
>  
>
> And this is my file sink definition:
>
>  
>
> val sink: StreamingFileSink[Event] = StreamingFileSink  
> ./forBulkFormat/(     new
> Path("hdfs://namenode.local:8020/user/datastream/"),    
> ParquetAvroWriters./forReflectRecord/(/classOf/[Event])   )   .build()
>
>  
>
>  
>
> If I execute this in cluster I get the following error:
>
>  
>
> java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter
>
>     at
> org.apache.flink.formats.parquet.avro.ParquetAvroWriters.createAvroParquetWriter(ParquetAvroWriters.java:84)
>
>     at
> org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73)
>
>     at
> org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57)
>
>     at
> org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:69)
>
>     at
> org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:83)
>
>     at
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209)
>
>     at
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200)
>
>     at
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
>
>     at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
>
>     at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:420)
>
>     at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>
>     at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>
>     at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>
>     at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
>
>     at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
>
>     at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
>
>     at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
>
>     at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>
>     at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>
>     at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>
>     at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>
>     at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
>
>  
>
>  
>
> Looks like there are some dependencies missing. How can I fix this?
>
>  
>
>  
>
> Jan O.
>
> HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den
> Adressaten bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren
> oder Dritten zugänglich zu machen. Sollten Sie diese Nachricht
> irrtümlich erhalten haben, bitte ich um Ihre Mitteilung per E-Mail
> oder unter der oben angegebenen Telefonnummer. 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to