Hi Austin, I will have a look at your repo. In the meantime, given that [1] is already merged in 1.10, would upgrading to 1.10 and using the newly introduced CompressWriterFactory be an option for you?
It is unfortunate that this feature was not documented. Cheers, Kostas [1] https://issues.apache.org/jira/browse/FLINK-13634 On Tue, Mar 3, 2020 at 11:13 PM Austin Cawley-Edwards < austin.caw...@gmail.com> wrote: > Hi all, > > Thanks for the docs pointer/ FLIP Rafi, and the workaround strategy Kostas > -- strange though, as I wasn't using a bounded source when I first ran into > this issue. I have updated the example repo to use an unbounded source[1], > and the same file corruption problems remain. > > Anything else I could be doing wrong with the compression stream? > > Thanks again, > Austin > > [1]: > https://github.com/austince/flink-streaming-file-sink-compression/tree/unbounded > > On Tue, Mar 3, 2020 at 3:50 AM Kostas Kloudas <kklou...@apache.org> wrote: > >> Hi Austin and Rafi, >> >> @Rafi Thanks for providing the pointers! >> Unfortunately there is no progress on the FLIP (or the issue). >> >> @ Austin In the meantime, what you could do --assuming that your input is >> bounded -- you could simply not stop the job after the whole input is >> processed, then wait until the output is committed, and then cancel the >> job. I know and I agree that this is not an elegant solution but it is a >> temporary workaround. >> >> Hopefully the FLIP and related issue is going to be prioritised soon. >> >> Cheers, >> Kostas >> >> On Tue, Mar 3, 2020 at 8:04 AM Rafi Aroch <rafi.ar...@gmail.com> wrote: >> >>> Hi, >>> >>> This happens because StreamingFileSink does not support a finite input >>> stream. >>> In the docs it's mentioned under "Important Considerations": >>> >>> [image: image.png] >>> >>> This behaviour often surprises users... >>> >>> There's a FLIP >>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs> >>> and >>> an issue <https://issues.apache.org/jira/browse/FLINK-13103> about >>> fixing this. I'm not sure what's the status though, maybe Kostas can share. >>> >>> Thanks, >>> Rafi >>> >>> >>> On Mon, Mar 2, 2020 at 5:05 PM Austin Cawley-Edwards < >>> austin.caw...@gmail.com> wrote: >>> >>>> Hi Dawid and Kostas, >>>> >>>> Sorry for the late reply + thank you for the troubleshooting. I put >>>> together an example repo that reproduces the issue[1], because I did have >>>> checkpointing enabled in my previous case -- still must be doing something >>>> wrong with that config though. >>>> >>>> Thanks! >>>> Austin >>>> >>>> [1]: https://github.com/austince/flink-streaming-file-sink-compression >>>> >>>> >>>> On Mon, Feb 24, 2020 at 5:28 AM Kostas Kloudas <kklou...@apache.org> >>>> wrote: >>>> >>>>> Hi Austin, >>>>> >>>>> Dawid is correct in that you need to enable checkpointing for the >>>>> StreamingFileSink to work. >>>>> >>>>> I hope this solves the problem, >>>>> Kostas >>>>> >>>>> On Mon, Feb 24, 2020 at 11:08 AM Dawid Wysakowicz >>>>> <dwysakow...@apache.org> wrote: >>>>> > >>>>> > Hi Austing, >>>>> > >>>>> > If I am not mistaken the StreamingFileSink by default flushes on >>>>> checkpoints. If you don't have checkpoints enabled it might happen that >>>>> not >>>>> all data is flushed. >>>>> > >>>>> > I think you can also adjust that behavior with: >>>>> > >>>>> > forBulkFormat(...) >>>>> > >>>>> > .withRollingPolicy(/* your custom logic */) >>>>> > >>>>> > I also cc Kostas who should be able to correct me if I am wrong. >>>>> > >>>>> > Best, >>>>> > >>>>> > Dawid >>>>> > >>>>> > On 22/02/2020 01:59, Austin Cawley-Edwards wrote: >>>>> > >>>>> > Hi there, >>>>> > >>>>> > Using Flink 1.9.1, trying to write .tgz files with the >>>>> StreamingFileSink#BulkWriter. It seems like flushing the output stream >>>>> doesn't flush all the data written. I've verified I can create valid files >>>>> using the same APIs and data on there own, so thinking it must be >>>>> something >>>>> I'm doing wrong with the bulk format. I'm writing to the local filesystem, >>>>> with the `file://` protocol. >>>>> > >>>>> > For Tar/ Gzipping, I'm using the Apache Commons Compression library, >>>>> version 1.20. >>>>> > >>>>> > Here's a runnable example of the issue: >>>>> > >>>>> > import org.apache.commons.compress.archivers.tar.TarArchiveEntry; >>>>> > import >>>>> org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; >>>>> > import >>>>> org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; >>>>> > import org.apache.flink.api.common.serialization.BulkWriter; >>>>> > import org.apache.flink.core.fs.FSDataOutputStream; >>>>> > import org.apache.flink.core.fs.Path; >>>>> > import >>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>>>> > import >>>>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; >>>>> > >>>>> > import java.io.FileOutputStream; >>>>> > import java.io.IOException; >>>>> > import java.io.Serializable; >>>>> > import java.nio.charset.StandardCharsets; >>>>> > >>>>> > class Scratch { >>>>> > public static class Record implements Serializable { >>>>> > private static final long serialVersionUID = 1L; >>>>> > >>>>> > String id; >>>>> > >>>>> > public Record() {} >>>>> > >>>>> > public Record(String id) { >>>>> > this.id = id; >>>>> > } >>>>> > >>>>> > public String getId() { >>>>> > return id; >>>>> > } >>>>> > >>>>> > public void setId(String id) { >>>>> > this.id = id; >>>>> > } >>>>> > } >>>>> > >>>>> > public static void main(String[] args) throws Exception { >>>>> > final StreamExecutionEnvironment env = >>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>> > >>>>> > TarArchiveOutputStream taos = new TarArchiveOutputStream(new >>>>> GzipCompressorOutputStream(new >>>>> FileOutputStream("/home/austin/Downloads/test.tgz"))); >>>>> > TarArchiveEntry fileEntry = new >>>>> TarArchiveEntry(String.format("%s.txt", "test")); >>>>> > String fullText = "hey\nyou\nwork"; >>>>> > byte[] fullTextData = fullText.getBytes(); >>>>> > fileEntry.setSize(fullTextData.length); >>>>> > taos.putArchiveEntry(fileEntry); >>>>> > taos.write(fullTextData, 0, fullTextData.length); >>>>> > taos.closeArchiveEntry(); >>>>> > taos.flush(); >>>>> > taos.close(); >>>>> > >>>>> > StreamingFileSink<Record> textSink = StreamingFileSink >>>>> > .forBulkFormat(new >>>>> Path("file:///home/austin/Downloads/text-output"), >>>>> > new BulkWriter.Factory<Record>() { >>>>> > @Override >>>>> > public BulkWriter<Record> create(FSDataOutputStream >>>>> out) throws IOException { >>>>> > final TarArchiveOutputStream compressedOutputStream >>>>> = new TarArchiveOutputStream(new GzipCompressorOutputStream(out)); >>>>> > >>>>> > return new BulkWriter<Record>() { >>>>> > @Override >>>>> > public void addElement(Record record) throws >>>>> IOException { >>>>> > TarArchiveEntry fileEntry = new >>>>> TarArchiveEntry(String.format("%s.txt", record.id)); >>>>> > byte[] fullTextData = >>>>> "hey\nyou\nplease\nwork".getBytes(StandardCharsets.UTF_8); >>>>> > fileEntry.setSize(fullTextData.length); >>>>> > >>>>> compressedOutputStream.putArchiveEntry(fileEntry); >>>>> > compressedOutputStream.write(fullTextData, 0, >>>>> fullTextData.length); >>>>> > compressedOutputStream.closeArchiveEntry(); >>>>> > } >>>>> > >>>>> > @Override >>>>> > public void flush() throws IOException { >>>>> > compressedOutputStream.flush(); >>>>> > } >>>>> > >>>>> > @Override >>>>> > public void finish() throws IOException { >>>>> > this.flush(); >>>>> > } >>>>> > }; >>>>> > } >>>>> > }) >>>>> > .withBucketCheckInterval(1000) >>>>> > .build(); >>>>> > >>>>> > env >>>>> > .fromElements(new Record("1"), new Record("2")) >>>>> > .addSink(textSink) >>>>> > .name("Streaming File Sink") >>>>> > .uid("streaming-file-sink"); >>>>> > env.execute("streaming file sink test"); >>>>> > } >>>>> > } >>>>> > >>>>> > >>>>> > From the stat/ hex dumps, you can see that the first bits are there, >>>>> but are then cut off: >>>>> > >>>>> > ~/Downloads » stat test.tgz >>>>> > File: test.tgz >>>>> > Size: 114 Blocks: 8 IO Block: 4096 regular file >>>>> > Device: 801h/2049d Inode: 30041077 Links: 1 >>>>> > Access: (0664/-rw-rw-r--) Uid: ( 1000/ austin) Gid: ( 1000/ >>>>> austin) >>>>> > Access: 2020-02-21 19:30:06.009028283 -0500 >>>>> > Modify: 2020-02-21 19:30:44.509424406 -0500 >>>>> > Change: 2020-02-21 19:30:44.509424406 -0500 >>>>> > Birth: - >>>>> > >>>>> > ~/Downloads » tar -tvf test.tgz >>>>> > -rw-r--r-- 0/0 12 2020-02-21 19:35 test.txt >>>>> > >>>>> > ~/Downloads » hd test.tgz >>>>> > 00000000 1f 8b 08 00 00 00 00 00 00 ff ed cf 31 0e 80 20 >>>>> |............1.. | >>>>> > 00000010 0c 85 61 66 4f c1 09 cc 2b 14 3c 8f 83 89 89 03 >>>>> |..afO...+.<.....| >>>>> > 00000020 09 94 a8 b7 77 30 2e ae 8a 2e fd 96 37 f6 af 4c >>>>> |....w0......7..L| >>>>> > 00000030 45 7a d9 c4 34 04 02 22 b3 c5 e9 be 00 b1 25 1f >>>>> |Ez..4.."......%.| >>>>> > 00000040 1d 63 f0 81 82 05 91 77 d1 58 b4 8c ba d4 22 63 >>>>> |.c.....w.X...."c| >>>>> > 00000050 36 78 7c eb fe dc 0b 69 5f 98 a7 bd db 53 ed d6 >>>>> |6x|....i_....S..| >>>>> > 00000060 94 97 bf 5b 94 52 4a 7d e7 00 4d ce eb e7 00 08 >>>>> |...[.RJ}..M.....| >>>>> > 00000070 00 00 |..| >>>>> > 00000072 >>>>> > >>>>> > >>>>> > >>>>> > text-output/37 » tar -xzf part-0-0 >>>>> > >>>>> > gzip: stdin: unexpected end of file >>>>> > tar: Child returned status 1 >>>>> > tar: Error is not recoverable: exiting now >>>>> > >>>>> > text-output/37 » stat part-0-0 >>>>> > File: part-0-0 >>>>> > Size: 10 Blocks: 8 IO Block: 4096 regular >>>>> file >>>>> > Device: 801h/2049d Inode: 4590487 Links: 1 >>>>> > Access: (0664/-rw-rw-r--) Uid: ( 1000/ austin) Gid: ( 1000/ >>>>> austin) >>>>> > Access: 2020-02-21 19:33:06.258888702 -0500 >>>>> > Modify: 2020-02-21 19:33:04.466870139 -0500 >>>>> > Change: 2020-02-21 19:33:05.294878716 -0500 >>>>> > Birth: - >>>>> > >>>>> > text-output/37 » hd part-0-0 >>>>> > 00000000 1f 8b 08 00 00 00 00 00 00 ff >>>>> |..........| >>>>> > 0000000a >>>>> > >>>>> > Is there anything simple I'm missing? >>>>> > >>>>> > Best, >>>>> > Austin >>>>> >>>>