Hi Kostas,

Thank you.
I'm currently testing my job against a small file, so it's finishing before
the checkpointing starts.
But also if it was a larger file and checkpoint did happen, there would
always be the tailing events starting after the last checkpoint until the
source has finished.
So would these events be lost?
In this case, any flow which is  (bounded stream) => (StreamingFileSink)
would not give the expected results...

The other alternative would be using BucketingSink, but it would not
guaranty exactly-once into S3 which is not preferable.

Can you suggest any workaround? Somehow making sure checkpointing is
triggered at the end?

Rafi


On Thu, Mar 21, 2019 at 9:40 PM Kostas Kloudas <k.klou...@ververica.com>
wrote:

> Sorry Rafi,
>
> I just read your previous response where you say that you have already
> activated checkpointing.
> My bad for not paying attention.
>
> Unfortunately, currently in-progress files only roll (or get finalized) on
> checkpoint barriers and NOT when calling close().
> This is due to the fact that at the function level, Flink does not
> differentiate between failures and normal termination.
> But there are plans to fix it:
> https://issues.apache.org/jira/browse/FLINK-2646
>
> So given the above, you should check if checkpoints go through your
> pipeline or not before your source
> stream reaches its end. If there are no checkpoints, then your in-progress
> files will not be finalized and
> Parquet, for example, will not write the footer that is needed to be able
> to properly read the file.
>
> Kostas
>
>
> On Thu, Mar 21, 2019 at 8:03 PM Rafi Aroch <rafi.ar...@gmail.com> wrote:
>
>> Hi Kostas,
>>
>> Yes I have.
>>
>> Rafi
>>
>> On Thu, Mar 21, 2019, 20:47 Kostas Kloudas <kklou...@gmail.com> wrote:
>>
>>> Hi Rafi,
>>>
>>> Have you enabled checkpointing for you job?
>>>
>>> Cheers,
>>> Kostas
>>>
>>> On Thu, Mar 21, 2019 at 5:18 PM Rafi Aroch <rafi.ar...@gmail.com> wrote:
>>>
>>>> Hi Piotr and Kostas,
>>>>
>>>> Thanks for your reply.
>>>>
>>>> The issue is that I don't see any committed files, only in-progress.
>>>> I tried to debug the code for more details. I see that in
>>>> *BulkPartWriter* I do reach the *write* methods and see events getting
>>>> written, but I never reach the *closeForCommit*. I reach straight to
>>>> the *close* function where all parts are disposed.
>>>>
>>>> In my job I have a finite stream (source is reading from parquet
>>>> file/s). Doing some windowed aggregation and writing back to a parquet
>>>> file.
>>>> As far as I know, it should commit files during checkpoints and when
>>>> the stream has finished. I did enabled checkpointing.
>>>> I did verify that if I connect to other sinks, I see the events.
>>>>
>>>> Let me know if I can provide any further information that could be
>>>> helpful.
>>>>
>>>> Would appreciate your help.
>>>>
>>>> Thanks,
>>>> Rafi
>>>>
>>>>
>>>> On Thu, Mar 21, 2019 at 5:20 PM Kostas Kloudas <kklou...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Rafi,
>>>>>
>>>>> Piotr is correct. In-progress files are not necessarily readable.
>>>>> The valid files are the ones that are "committed" or finalized.
>>>>>
>>>>> Cheers,
>>>>> Kostas
>>>>>
>>>>> On Thu, Mar 21, 2019 at 2:53 PM Piotr Nowojski <pi...@ververica.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I’m not sure, but shouldn’t you be just reading committed files and
>>>>>> ignore in-progress? Maybe Kostas could add more insight to this topic.
>>>>>>
>>>>>> Piotr Nowojski
>>>>>>
>>>>>> On 20 Mar 2019, at 12:23, Rafi Aroch <rafi.ar...@gmail.com> wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I'm trying to stream events in Prorobuf format into a parquet file.
>>>>>> I looked into both streaming-file options: BucketingSink &
>>>>>> StreamingFileSink.
>>>>>> I first tried using the newer *StreamingFileSink* with the *forBulkFormat
>>>>>> *API. I noticed there's currently support only for the Avro format
>>>>>> with the *ParquetAvroWriters*.
>>>>>> I followed the same convention as Avro and wrote a
>>>>>> *ParquetProtoWriters* builder class:
>>>>>>
>>>>>> public class ParquetProtoWriters {
>>>>>>
>>>>>>     private static final int pageSize = 64 * 1024;
>>>>>>
>>>>>>     public static <T extends Message> ParquetWriterFactory<T> 
>>>>>> forType(final Class<T> protoClass) {
>>>>>>         final ParquetBuilder<T> builder = (out) -> 
>>>>>> createProtoParquetWriter(protoClass, out);
>>>>>>         return new ParquetWriterFactory<>(builder);
>>>>>>     }
>>>>>>
>>>>>>     private static <T extends Message> ParquetWriter<T> 
>>>>>> createProtoParquetWriter(
>>>>>>             Class<T> type,
>>>>>>             OutputFile out) throws IOException {
>>>>>>
>>>>>>         return ProtoParquetWriter.<T>builder(out)
>>>>>>                 .withPageSize(pageSize)
>>>>>>                 .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>>>>>                 .withCompressionCodec(CompressionCodecName.SNAPPY)
>>>>>>                 .withProtoClass(type)
>>>>>>                 .build();
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>> And then I use it as follows:
>>>>>>
>>>>>> StreamingFileSink
>>>>>>         .forBulkFormat(new Path("some-path), 
>>>>>> ParquetProtoWriters.forType(SomeProtoType.class))
>>>>>>         .build();
>>>>>>
>>>>>> I ran tests on the *ParquetProtoWriters *itself and it writes
>>>>>> everything properly and i'm able to read the files.
>>>>>>
>>>>>> When I use the sink as part of a job *I see illegal Parquet files
>>>>>> created*:
>>>>>>
>>>>>> # parquet-tools cat 
>>>>>> .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea
>>>>>> .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea is not a 
>>>>>> Parquet file (too small length: 4)
>>>>>>
>>>>>>
>>>>>> Can anyone suggest what am I missing here?
>>>>>>
>>>>>> When trying to use the *BucketingSink*, I wrote a Writer class for
>>>>>> Protobuf and everything worked perfectly:
>>>>>>
>>>>>> public class FlinkProtoParquetWriter<T extends MessageOrBuilder> 
>>>>>> implements Writer<T> {
>>>>>>
>>>>>>     private static final long serialVersionUID = -975302556515811398L;
>>>>>>
>>>>>>     private Path path;
>>>>>>     private Class<? extends Message> protoClass;
>>>>>>     private transient ParquetWriter<T> writer;
>>>>>>
>>>>>>     private int position;
>>>>>>     private final CompressionCodecName compressionCodecName = 
>>>>>> CompressionCodecName.SNAPPY;
>>>>>>     private final int pageSize = 64 * 1024;
>>>>>>
>>>>>>     public FlinkProtoParquetWriter(Class<? extends Message> protoClass) {
>>>>>>         this.protoClass = protoClass;
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     public void open(FileSystem fs, Path path) throws IOException {
>>>>>>         this.position = 0;
>>>>>>         this.path = path;
>>>>>>
>>>>>>         if (writer != null) {
>>>>>>             writer.close();
>>>>>>         }
>>>>>>
>>>>>>         writer = createWriter();
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     public long flush() throws IOException {
>>>>>>         Preconditions.checkNotNull(writer);
>>>>>>         position += writer.getDataSize();
>>>>>>         writer.close();
>>>>>>         writer = createWriter();
>>>>>>
>>>>>>         return position;
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     public long getPos() {
>>>>>>         Preconditions.checkNotNull(writer);
>>>>>>         return position + writer.getDataSize();
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     public void close() throws IOException {
>>>>>>         if (writer != null) {
>>>>>>             writer.close();
>>>>>>             writer = null;
>>>>>>         }
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     public void write(T element) throws IOException {
>>>>>>         Preconditions.checkNotNull(writer);
>>>>>>         writer.write(element);
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     public Writer<T> duplicate() {
>>>>>>         return new FlinkProtoParquetWriter<>(protoClass);
>>>>>>     }
>>>>>>
>>>>>>     private ParquetWriter<T> createWriter() throws IOException {
>>>>>>         return ProtoParquetWriter
>>>>>>                 .<T>builder(path)
>>>>>>                 .withPageSize(pageSize)
>>>>>>                 .withCompressionCodec(compressionCodecName)
>>>>>>                 .withProtoClass(protoClass)
>>>>>>                 .build();
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>>
>>>>>> Rafi
>>>>>>
>>>>>>
>>>>>>
>
> --
>
> Kostas Kloudas | Software Engineer
>
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>

Reply via email to