Hi Eyal,

This is a known issue which is fixed now (see [1]) and will be part of
the next releases.

Cheers,
Kostas

[1] https://issues.apache.org/jira/browse/FLINK-16371

On Tue, Mar 24, 2020 at 11:10 AM Eyal Pe'er <eyal.p...@startapp.com> wrote:
>
> Hi all,
>
> I am trying to write a sink function that retrieves string and creates 
> compressed files in time buckets.
>
> The code is pretty straight forward and based on CompressWriterFactoryTest
>
>
>
> import org.apache.flink.core.fs.Path;
>
> import org.apache.flink.formats.compress.CompressWriterFactory;
>
> import org.apache.flink.formats.compress.extractor.DefaultExtractor;
>
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
>
> import 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
>
> import 
> org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
>
> import org.apache.flink.formats.compress.CompressWriters;
>
> import org.apache.hadoop.conf.Configuration;
>
>
>
> CompressWriterFactory<String> writer = CompressWriters.forExtractor(new 
> DefaultExtractor<String>())
>
>                 .withHadoopCompression("Gzip", new Configuration());
>
>
>
> StreamingFileSink.forBulkFormat(new Path(getTargetPath()), writer)
>
>                 .withBucketAssigner(new 
> DateTimeBucketAssigner<>(getDataTimeBucketFormat(getDataTimeBucket()))).build();
>
>
>
>
>
>
>
> When I tried to add it as a sink (dataStream.addSink) the app crashed due to:
>
>
>
> org.apache.hadoop.io.compress.GzipCodec@55e3d6c3 is not serializable. The 
> object probably contains or references non serializable fields.
>
>
>
> Well, I guess I used something wrong, but I am not sure what ?
>
> Or maybe I should convert the SinkFunction to serializable one, but how can I 
> do it?
>
> Best regards
>
> Eyal Peer
>
>

Reply via email to