Hi Piotr and Kostas,

Thanks for your reply.

The issue is that I don't see any committed files, only in-progress.
I tried to debug the code for more details. I see that in *BulkPartWriter* I
do reach the *write* methods and see events getting written, but I never
reach the *closeForCommit*. I reach straight to the *close* function where
all parts are disposed.

In my job I have a finite stream (source is reading from parquet file/s).
Doing some windowed aggregation and writing back to a parquet file.
As far as I know, it should commit files during checkpoints and when the
stream has finished. I did enabled checkpointing.
I did verify that if I connect to other sinks, I see the events.

Let me know if I can provide any further information that could be helpful.

Would appreciate your help.

Thanks,
Rafi


On Thu, Mar 21, 2019 at 5:20 PM Kostas Kloudas <kklou...@gmail.com> wrote:

> Hi Rafi,
>
> Piotr is correct. In-progress files are not necessarily readable.
> The valid files are the ones that are "committed" or finalized.
>
> Cheers,
> Kostas
>
> On Thu, Mar 21, 2019 at 2:53 PM Piotr Nowojski <pi...@ververica.com>
> wrote:
>
>> Hi,
>>
>> I’m not sure, but shouldn’t you be just reading committed files and
>> ignore in-progress? Maybe Kostas could add more insight to this topic.
>>
>> Piotr Nowojski
>>
>> On 20 Mar 2019, at 12:23, Rafi Aroch <rafi.ar...@gmail.com> wrote:
>>
>> Hi,
>>
>> I'm trying to stream events in Prorobuf format into a parquet file.
>> I looked into both streaming-file options: BucketingSink &
>> StreamingFileSink.
>> I first tried using the newer *StreamingFileSink* with the *forBulkFormat
>> *API. I noticed there's currently support only for the Avro format with
>> the *ParquetAvroWriters*.
>> I followed the same convention as Avro and wrote a *ParquetProtoWriters* 
>> builder
>> class:
>>
>> public class ParquetProtoWriters {
>>
>>     private static final int pageSize = 64 * 1024;
>>
>>     public static <T extends Message> ParquetWriterFactory<T> forType(final 
>> Class<T> protoClass) {
>>         final ParquetBuilder<T> builder = (out) -> 
>> createProtoParquetWriter(protoClass, out);
>>         return new ParquetWriterFactory<>(builder);
>>     }
>>
>>     private static <T extends Message> ParquetWriter<T> 
>> createProtoParquetWriter(
>>             Class<T> type,
>>             OutputFile out) throws IOException {
>>
>>         return ProtoParquetWriter.<T>builder(out)
>>                 .withPageSize(pageSize)
>>                 .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>                 .withCompressionCodec(CompressionCodecName.SNAPPY)
>>                 .withProtoClass(type)
>>                 .build();
>>     }
>> }
>>
>> And then I use it as follows:
>>
>> StreamingFileSink
>>         .forBulkFormat(new Path("some-path), 
>> ParquetProtoWriters.forType(SomeProtoType.class))
>>         .build();
>>
>> I ran tests on the *ParquetProtoWriters *itself and it writes everything
>> properly and i'm able to read the files.
>>
>> When I use the sink as part of a job *I see illegal Parquet files
>> created*:
>>
>> # parquet-tools cat .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea
>> .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea is not a Parquet 
>> file (too small length: 4)
>>
>>
>> Can anyone suggest what am I missing here?
>>
>> When trying to use the *BucketingSink*, I wrote a Writer class for
>> Protobuf and everything worked perfectly:
>>
>> public class FlinkProtoParquetWriter<T extends MessageOrBuilder> implements 
>> Writer<T> {
>>
>>     private static final long serialVersionUID = -975302556515811398L;
>>
>>     private Path path;
>>     private Class<? extends Message> protoClass;
>>     private transient ParquetWriter<T> writer;
>>
>>     private int position;
>>     private final CompressionCodecName compressionCodecName = 
>> CompressionCodecName.SNAPPY;
>>     private final int pageSize = 64 * 1024;
>>
>>     public FlinkProtoParquetWriter(Class<? extends Message> protoClass) {
>>         this.protoClass = protoClass;
>>     }
>>
>>     @Override
>>     public void open(FileSystem fs, Path path) throws IOException {
>>         this.position = 0;
>>         this.path = path;
>>
>>         if (writer != null) {
>>             writer.close();
>>         }
>>
>>         writer = createWriter();
>>     }
>>
>>     @Override
>>     public long flush() throws IOException {
>>         Preconditions.checkNotNull(writer);
>>         position += writer.getDataSize();
>>         writer.close();
>>         writer = createWriter();
>>
>>         return position;
>>     }
>>
>>     @Override
>>     public long getPos() {
>>         Preconditions.checkNotNull(writer);
>>         return position + writer.getDataSize();
>>     }
>>
>>     @Override
>>     public void close() throws IOException {
>>         if (writer != null) {
>>             writer.close();
>>             writer = null;
>>         }
>>     }
>>
>>     @Override
>>     public void write(T element) throws IOException {
>>         Preconditions.checkNotNull(writer);
>>         writer.write(element);
>>     }
>>
>>     @Override
>>     public Writer<T> duplicate() {
>>         return new FlinkProtoParquetWriter<>(protoClass);
>>     }
>>
>>     private ParquetWriter<T> createWriter() throws IOException {
>>         return ProtoParquetWriter
>>                 .<T>builder(path)
>>                 .withPageSize(pageSize)
>>                 .withCompressionCodec(compressionCodecName)
>>                 .withProtoClass(protoClass)
>>                 .build();
>>     }
>> }
>>
>>
>> Rafi
>>
>>
>>

Reply via email to