As for StreamingFileSink and compressed output, see the 
StreamingFileSink.forBulkFormat and BulkWriter.Factory classes. Simple example 
(using apache commons-io and commons-compress):

 val writer = new BulkWriter.Factory[String] {
 override def create(out: FSDataOutputStream): BulkWriter[String] = new 
BulkWriter[String] {
 val compressed = new GzipCompressorOutputStream(out)
 override def addElement(element: String): Unit = 
compressed.write(element.getBytes())
 override def flush(): Unit = compressed.flush()
 override def finish(): Unit = compressed.close()
 }
 }
 val sink = StreamingFileSink.forBulkFormat[String](new Path("/some/path"), 
writer)

There are still some usability issues with StreamingFileSink (like not being 
able to customize the resulting file names), but they are already going to be 
fixed in Flink 1.10.

Roman Grebennikov | g...@dfdx.me


On Fri, Oct 11, 2019, at 23:07, John O wrote:
> Hello,

> 

> Question 1

> I don’t see any reference material showing how to write compressed (gzip) 
> files with StreamingFileSink. Can someone point me in the right direction?

> 

> Question 2

> We currently have a use case for a “StreamingFileProcessFunction”. Basically 
> we need an output for the StreamingFileSink that will be used by a downstream 
> processor. What would be the best way to implement this feature?

> 

> 

> Best,

> Song

Reply via email to