Hi Rafi,

Have you enabled checkpointing for you job?

Cheers,
Kostas

On Thu, Mar 21, 2019 at 5:18 PM Rafi Aroch <rafi.ar...@gmail.com> wrote:

> Hi Piotr and Kostas,
>
> Thanks for your reply.
>
> The issue is that I don't see any committed files, only in-progress.
> I tried to debug the code for more details. I see that in *BulkPartWriter* I
> do reach the *write* methods and see events getting written, but I never
> reach the *closeForCommit*. I reach straight to the *close* function
> where all parts are disposed.
>
> In my job I have a finite stream (source is reading from parquet file/s).
> Doing some windowed aggregation and writing back to a parquet file.
> As far as I know, it should commit files during checkpoints and when the
> stream has finished. I did enabled checkpointing.
> I did verify that if I connect to other sinks, I see the events.
>
> Let me know if I can provide any further information that could be helpful.
>
> Would appreciate your help.
>
> Thanks,
> Rafi
>
>
> On Thu, Mar 21, 2019 at 5:20 PM Kostas Kloudas <kklou...@gmail.com> wrote:
>
>> Hi Rafi,
>>
>> Piotr is correct. In-progress files are not necessarily readable.
>> The valid files are the ones that are "committed" or finalized.
>>
>> Cheers,
>> Kostas
>>
>> On Thu, Mar 21, 2019 at 2:53 PM Piotr Nowojski <pi...@ververica.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I’m not sure, but shouldn’t you be just reading committed files and
>>> ignore in-progress? Maybe Kostas could add more insight to this topic.
>>>
>>> Piotr Nowojski
>>>
>>> On 20 Mar 2019, at 12:23, Rafi Aroch <rafi.ar...@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>> I'm trying to stream events in Prorobuf format into a parquet file.
>>> I looked into both streaming-file options: BucketingSink &
>>> StreamingFileSink.
>>> I first tried using the newer *StreamingFileSink* with the *forBulkFormat
>>> *API. I noticed there's currently support only for the Avro format with
>>> the *ParquetAvroWriters*.
>>> I followed the same convention as Avro and wrote a *ParquetProtoWriters* 
>>> builder
>>> class:
>>>
>>> public class ParquetProtoWriters {
>>>
>>>     private static final int pageSize = 64 * 1024;
>>>
>>>     public static <T extends Message> ParquetWriterFactory<T> forType(final 
>>> Class<T> protoClass) {
>>>         final ParquetBuilder<T> builder = (out) -> 
>>> createProtoParquetWriter(protoClass, out);
>>>         return new ParquetWriterFactory<>(builder);
>>>     }
>>>
>>>     private static <T extends Message> ParquetWriter<T> 
>>> createProtoParquetWriter(
>>>             Class<T> type,
>>>             OutputFile out) throws IOException {
>>>
>>>         return ProtoParquetWriter.<T>builder(out)
>>>                 .withPageSize(pageSize)
>>>                 .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>>                 .withCompressionCodec(CompressionCodecName.SNAPPY)
>>>                 .withProtoClass(type)
>>>                 .build();
>>>     }
>>> }
>>>
>>> And then I use it as follows:
>>>
>>> StreamingFileSink
>>>         .forBulkFormat(new Path("some-path), 
>>> ParquetProtoWriters.forType(SomeProtoType.class))
>>>         .build();
>>>
>>> I ran tests on the *ParquetProtoWriters *itself and it writes
>>> everything properly and i'm able to read the files.
>>>
>>> When I use the sink as part of a job *I see illegal Parquet files
>>> created*:
>>>
>>> # parquet-tools cat 
>>> .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea
>>> .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea is not a Parquet 
>>> file (too small length: 4)
>>>
>>>
>>> Can anyone suggest what am I missing here?
>>>
>>> When trying to use the *BucketingSink*, I wrote a Writer class for
>>> Protobuf and everything worked perfectly:
>>>
>>> public class FlinkProtoParquetWriter<T extends MessageOrBuilder> implements 
>>> Writer<T> {
>>>
>>>     private static final long serialVersionUID = -975302556515811398L;
>>>
>>>     private Path path;
>>>     private Class<? extends Message> protoClass;
>>>     private transient ParquetWriter<T> writer;
>>>
>>>     private int position;
>>>     private final CompressionCodecName compressionCodecName = 
>>> CompressionCodecName.SNAPPY;
>>>     private final int pageSize = 64 * 1024;
>>>
>>>     public FlinkProtoParquetWriter(Class<? extends Message> protoClass) {
>>>         this.protoClass = protoClass;
>>>     }
>>>
>>>     @Override
>>>     public void open(FileSystem fs, Path path) throws IOException {
>>>         this.position = 0;
>>>         this.path = path;
>>>
>>>         if (writer != null) {
>>>             writer.close();
>>>         }
>>>
>>>         writer = createWriter();
>>>     }
>>>
>>>     @Override
>>>     public long flush() throws IOException {
>>>         Preconditions.checkNotNull(writer);
>>>         position += writer.getDataSize();
>>>         writer.close();
>>>         writer = createWriter();
>>>
>>>         return position;
>>>     }
>>>
>>>     @Override
>>>     public long getPos() {
>>>         Preconditions.checkNotNull(writer);
>>>         return position + writer.getDataSize();
>>>     }
>>>
>>>     @Override
>>>     public void close() throws IOException {
>>>         if (writer != null) {
>>>             writer.close();
>>>             writer = null;
>>>         }
>>>     }
>>>
>>>     @Override
>>>     public void write(T element) throws IOException {
>>>         Preconditions.checkNotNull(writer);
>>>         writer.write(element);
>>>     }
>>>
>>>     @Override
>>>     public Writer<T> duplicate() {
>>>         return new FlinkProtoParquetWriter<>(protoClass);
>>>     }
>>>
>>>     private ParquetWriter<T> createWriter() throws IOException {
>>>         return ProtoParquetWriter
>>>                 .<T>builder(path)
>>>                 .withPageSize(pageSize)
>>>                 .withCompressionCodec(compressionCodecName)
>>>                 .withProtoClass(protoClass)
>>>                 .build();
>>>     }
>>> }
>>>
>>>
>>> Rafi
>>>
>>>
>>>

Reply via email to