Hi Austin,

I will have a look at your repo. In the meantime, given that [1] is already
merged in 1.10,
would upgrading to 1.10 and using the newly introduced
CompressWriterFactory be an option for you?

It is unfortunate that this feature was not documented.

Cheers,
Kostas

[1] https://issues.apache.org/jira/browse/FLINK-13634


On Tue, Mar 3, 2020 at 11:13 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hi all,
>
> Thanks for the docs pointer/ FLIP Rafi, and the workaround strategy Kostas
> -- strange though, as I wasn't using a bounded source when I first ran into
> this issue. I have updated the example repo to use an unbounded source[1],
> and the same file corruption problems remain.
>
> Anything else I could be doing wrong with the compression stream?
>
> Thanks again,
> Austin
>
> [1]:
> https://github.com/austince/flink-streaming-file-sink-compression/tree/unbounded
>
> On Tue, Mar 3, 2020 at 3:50 AM Kostas Kloudas <kklou...@apache.org> wrote:
>
>> Hi Austin and Rafi,
>>
>> @Rafi Thanks for providing the pointers!
>> Unfortunately there is no progress on the FLIP (or the issue).
>>
>> @ Austin In the meantime, what you could do --assuming that your input is
>> bounded --  you could simply not stop the job after the whole input is
>> processed, then wait until the output is committed, and then cancel the
>> job. I know and I agree that this is not an elegant solution but it is a
>> temporary workaround.
>>
>> Hopefully the FLIP and related issue is going to be prioritised soon.
>>
>> Cheers,
>> Kostas
>>
>> On Tue, Mar 3, 2020 at 8:04 AM Rafi Aroch <rafi.ar...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> This happens because StreamingFileSink does not support a finite input
>>> stream.
>>> In the docs it's mentioned under "Important Considerations":
>>>
>>> [image: image.png]
>>>
>>> This behaviour often surprises users...
>>>
>>> There's a FLIP
>>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs>
>>>  and
>>> an issue <https://issues.apache.org/jira/browse/FLINK-13103> about
>>> fixing this. I'm not sure what's the status though, maybe Kostas can share.
>>>
>>> Thanks,
>>> Rafi
>>>
>>>
>>> On Mon, Mar 2, 2020 at 5:05 PM Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
>>>> Hi Dawid and Kostas,
>>>>
>>>> Sorry for the late reply + thank you for the troubleshooting. I put
>>>> together an example repo that reproduces the issue[1], because I did have
>>>> checkpointing enabled in my previous case -- still must be doing something
>>>> wrong with that config though.
>>>>
>>>> Thanks!
>>>> Austin
>>>>
>>>> [1]: https://github.com/austince/flink-streaming-file-sink-compression
>>>>
>>>>
>>>> On Mon, Feb 24, 2020 at 5:28 AM Kostas Kloudas <kklou...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Austin,
>>>>>
>>>>> Dawid is correct in that you need to enable checkpointing for the
>>>>> StreamingFileSink to work.
>>>>>
>>>>> I hope this solves the problem,
>>>>> Kostas
>>>>>
>>>>> On Mon, Feb 24, 2020 at 11:08 AM Dawid Wysakowicz
>>>>> <dwysakow...@apache.org> wrote:
>>>>> >
>>>>> > Hi Austing,
>>>>> >
>>>>> > If I am not mistaken the StreamingFileSink by default flushes on
>>>>> checkpoints. If you don't have checkpoints enabled it might happen that 
>>>>> not
>>>>> all data is flushed.
>>>>> >
>>>>> > I think you can also adjust that behavior with:
>>>>> >
>>>>> > forBulkFormat(...)
>>>>> >
>>>>> > .withRollingPolicy(/* your custom logic */)
>>>>> >
>>>>> > I also cc Kostas who should be able to correct me if I am wrong.
>>>>> >
>>>>> > Best,
>>>>> >
>>>>> > Dawid
>>>>> >
>>>>> > On 22/02/2020 01:59, Austin Cawley-Edwards wrote:
>>>>> >
>>>>> > Hi there,
>>>>> >
>>>>> > Using Flink 1.9.1, trying to write .tgz files with the
>>>>> StreamingFileSink#BulkWriter. It seems like flushing the output stream
>>>>> doesn't flush all the data written. I've verified I can create valid files
>>>>> using the same APIs and data on there own, so thinking it must be 
>>>>> something
>>>>> I'm doing wrong with the bulk format. I'm writing to the local filesystem,
>>>>> with the `file://` protocol.
>>>>> >
>>>>> > For Tar/ Gzipping, I'm using the Apache Commons Compression library,
>>>>> version 1.20.
>>>>> >
>>>>> > Here's a runnable example of the issue:
>>>>> >
>>>>> > import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
>>>>> > import
>>>>> org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
>>>>> > import
>>>>> org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
>>>>> > import org.apache.flink.api.common.serialization.BulkWriter;
>>>>> > import org.apache.flink.core.fs.FSDataOutputStream;
>>>>> > import org.apache.flink.core.fs.Path;
>>>>> > import
>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>>> > import
>>>>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
>>>>> >
>>>>> > import java.io.FileOutputStream;
>>>>> > import java.io.IOException;
>>>>> > import java.io.Serializable;
>>>>> > import java.nio.charset.StandardCharsets;
>>>>> >
>>>>> > class Scratch {
>>>>> >   public static class Record implements Serializable {
>>>>> >     private static final long serialVersionUID = 1L;
>>>>> >
>>>>> >     String id;
>>>>> >
>>>>> >     public Record() {}
>>>>> >
>>>>> >     public Record(String id) {
>>>>> >       this.id = id;
>>>>> >     }
>>>>> >
>>>>> >     public String getId() {
>>>>> >       return id;
>>>>> >     }
>>>>> >
>>>>> >     public void setId(String id) {
>>>>> >       this.id = id;
>>>>> >     }
>>>>> >   }
>>>>> >
>>>>> >   public static void main(String[] args) throws Exception {
>>>>> >     final StreamExecutionEnvironment env =
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>> >
>>>>> >     TarArchiveOutputStream taos = new TarArchiveOutputStream(new
>>>>> GzipCompressorOutputStream(new
>>>>> FileOutputStream("/home/austin/Downloads/test.tgz")));
>>>>> >     TarArchiveEntry fileEntry = new
>>>>> TarArchiveEntry(String.format("%s.txt", "test"));
>>>>> >     String fullText = "hey\nyou\nwork";
>>>>> >     byte[] fullTextData = fullText.getBytes();
>>>>> >     fileEntry.setSize(fullTextData.length);
>>>>> >     taos.putArchiveEntry(fileEntry);
>>>>> >     taos.write(fullTextData, 0, fullTextData.length);
>>>>> >     taos.closeArchiveEntry();
>>>>> >     taos.flush();
>>>>> >     taos.close();
>>>>> >
>>>>> >     StreamingFileSink<Record> textSink = StreamingFileSink
>>>>> >         .forBulkFormat(new
>>>>> Path("file:///home/austin/Downloads/text-output"),
>>>>> >             new BulkWriter.Factory<Record>() {
>>>>> >               @Override
>>>>> >               public BulkWriter<Record> create(FSDataOutputStream
>>>>> out) throws IOException {
>>>>> >                 final TarArchiveOutputStream compressedOutputStream
>>>>> = new TarArchiveOutputStream(new GzipCompressorOutputStream(out));
>>>>> >
>>>>> >                 return new BulkWriter<Record>() {
>>>>> >                   @Override
>>>>> >                   public void addElement(Record record) throws
>>>>> IOException {
>>>>> >                     TarArchiveEntry fileEntry = new
>>>>> TarArchiveEntry(String.format("%s.txt", record.id));
>>>>> >                     byte[] fullTextData =
>>>>> "hey\nyou\nplease\nwork".getBytes(StandardCharsets.UTF_8);
>>>>> >                     fileEntry.setSize(fullTextData.length);
>>>>> >
>>>>>  compressedOutputStream.putArchiveEntry(fileEntry);
>>>>> >                     compressedOutputStream.write(fullTextData, 0,
>>>>> fullTextData.length);
>>>>> >                     compressedOutputStream.closeArchiveEntry();
>>>>> >                   }
>>>>> >
>>>>> >                   @Override
>>>>> >                   public void flush() throws IOException {
>>>>> >                     compressedOutputStream.flush();
>>>>> >                   }
>>>>> >
>>>>> >                   @Override
>>>>> >                   public void finish() throws IOException {
>>>>> >                     this.flush();
>>>>> >                   }
>>>>> >                 };
>>>>> >               }
>>>>> >             })
>>>>> >         .withBucketCheckInterval(1000)
>>>>> >         .build();
>>>>> >
>>>>> >     env
>>>>> >         .fromElements(new Record("1"), new Record("2"))
>>>>> >         .addSink(textSink)
>>>>> >         .name("Streaming File Sink")
>>>>> >         .uid("streaming-file-sink");
>>>>> >     env.execute("streaming file sink test");
>>>>> >   }
>>>>> > }
>>>>> >
>>>>> >
>>>>> > From the stat/ hex dumps, you can see that the first bits are there,
>>>>> but are then cut off:
>>>>> >
>>>>> > ~/Downloads » stat test.tgz
>>>>> >   File: test.tgz
>>>>> >   Size: 114       Blocks: 8          IO Block: 4096   regular file
>>>>> > Device: 801h/2049d Inode: 30041077    Links: 1
>>>>> > Access: (0664/-rw-rw-r--)  Uid: ( 1000/  austin)   Gid: ( 1000/
>>>>> austin)
>>>>> > Access: 2020-02-21 19:30:06.009028283 -0500
>>>>> > Modify: 2020-02-21 19:30:44.509424406 -0500
>>>>> > Change: 2020-02-21 19:30:44.509424406 -0500
>>>>> >  Birth: -
>>>>> >
>>>>> > ~/Downloads » tar -tvf test.tgz
>>>>> > -rw-r--r-- 0/0              12 2020-02-21 19:35 test.txt
>>>>> >
>>>>> > ~/Downloads » hd test.tgz
>>>>> > 00000000  1f 8b 08 00 00 00 00 00  00 ff ed cf 31 0e 80 20
>>>>> |............1.. |
>>>>> > 00000010  0c 85 61 66 4f c1 09 cc  2b 14 3c 8f 83 89 89 03
>>>>> |..afO...+.<.....|
>>>>> > 00000020  09 94 a8 b7 77 30 2e ae  8a 2e fd 96 37 f6 af 4c
>>>>> |....w0......7..L|
>>>>> > 00000030  45 7a d9 c4 34 04 02 22  b3 c5 e9 be 00 b1 25 1f
>>>>> |Ez..4.."......%.|
>>>>> > 00000040  1d 63 f0 81 82 05 91 77  d1 58 b4 8c ba d4 22 63
>>>>> |.c.....w.X...."c|
>>>>> > 00000050  36 78 7c eb fe dc 0b 69  5f 98 a7 bd db 53 ed d6
>>>>> |6x|....i_....S..|
>>>>> > 00000060  94 97 bf 5b 94 52 4a 7d  e7 00 4d ce eb e7 00 08
>>>>> |...[.RJ}..M.....|
>>>>> > 00000070  00 00                                             |..|
>>>>> > 00000072
>>>>> >
>>>>> >
>>>>> >
>>>>> > text-output/37 » tar -xzf part-0-0
>>>>> >
>>>>> > gzip: stdin: unexpected end of file
>>>>> > tar: Child returned status 1
>>>>> > tar: Error is not recoverable: exiting now
>>>>> >
>>>>> > text-output/37 » stat part-0-0
>>>>> >   File: part-0-0
>>>>> >   Size: 10              Blocks: 8          IO Block: 4096   regular
>>>>> file
>>>>> > Device: 801h/2049d      Inode: 4590487     Links: 1
>>>>> > Access: (0664/-rw-rw-r--)  Uid: ( 1000/  austin)   Gid: ( 1000/
>>>>> austin)
>>>>> > Access: 2020-02-21 19:33:06.258888702 -0500
>>>>> > Modify: 2020-02-21 19:33:04.466870139 -0500
>>>>> > Change: 2020-02-21 19:33:05.294878716 -0500
>>>>> >  Birth: -
>>>>> >
>>>>> > text-output/37 » hd part-0-0
>>>>> > 00000000  1f 8b 08 00 00 00 00 00  00 ff
>>>>> |..........|
>>>>> > 0000000a
>>>>> >
>>>>> > Is there anything simple I'm missing?
>>>>> >
>>>>> > Best,
>>>>> > Austin
>>>>>
>>>>

Reply via email to