Hi Rafi,

Piotr is correct. In-progress files are not necessarily readable.
The valid files are the ones that are "committed" or finalized.

Cheers,
Kostas

On Thu, Mar 21, 2019 at 2:53 PM Piotr Nowojski <pi...@ververica.com> wrote:

> Hi,
>
> I’m not sure, but shouldn’t you be just reading committed files and ignore
> in-progress? Maybe Kostas could add more insight to this topic.
>
> Piotr Nowojski
>
> On 20 Mar 2019, at 12:23, Rafi Aroch <rafi.ar...@gmail.com> wrote:
>
> Hi,
>
> I'm trying to stream events in Prorobuf format into a parquet file.
> I looked into both streaming-file options: BucketingSink &
> StreamingFileSink.
> I first tried using the newer *StreamingFileSink* with the *forBulkFormat
> *API. I noticed there's currently support only for the Avro format with
> the *ParquetAvroWriters*.
> I followed the same convention as Avro and wrote a *ParquetProtoWriters* 
> builder
> class:
>
> public class ParquetProtoWriters {
>
>     private static final int pageSize = 64 * 1024;
>
>     public static <T extends Message> ParquetWriterFactory<T> forType(final 
> Class<T> protoClass) {
>         final ParquetBuilder<T> builder = (out) -> 
> createProtoParquetWriter(protoClass, out);
>         return new ParquetWriterFactory<>(builder);
>     }
>
>     private static <T extends Message> ParquetWriter<T> 
> createProtoParquetWriter(
>             Class<T> type,
>             OutputFile out) throws IOException {
>
>         return ProtoParquetWriter.<T>builder(out)
>                 .withPageSize(pageSize)
>                 .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>                 .withCompressionCodec(CompressionCodecName.SNAPPY)
>                 .withProtoClass(type)
>                 .build();
>     }
> }
>
> And then I use it as follows:
>
> StreamingFileSink
>         .forBulkFormat(new Path("some-path), 
> ParquetProtoWriters.forType(SomeProtoType.class))
>         .build();
>
> I ran tests on the *ParquetProtoWriters *itself and it writes everything
> properly and i'm able to read the files.
>
> When I use the sink as part of a job *I see illegal Parquet files created*
> :
>
> # parquet-tools cat .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea
> .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea is not a Parquet 
> file (too small length: 4)
>
>
> Can anyone suggest what am I missing here?
>
> When trying to use the *BucketingSink*, I wrote a Writer class for
> Protobuf and everything worked perfectly:
>
> public class FlinkProtoParquetWriter<T extends MessageOrBuilder> implements 
> Writer<T> {
>
>     private static final long serialVersionUID = -975302556515811398L;
>
>     private Path path;
>     private Class<? extends Message> protoClass;
>     private transient ParquetWriter<T> writer;
>
>     private int position;
>     private final CompressionCodecName compressionCodecName = 
> CompressionCodecName.SNAPPY;
>     private final int pageSize = 64 * 1024;
>
>     public FlinkProtoParquetWriter(Class<? extends Message> protoClass) {
>         this.protoClass = protoClass;
>     }
>
>     @Override
>     public void open(FileSystem fs, Path path) throws IOException {
>         this.position = 0;
>         this.path = path;
>
>         if (writer != null) {
>             writer.close();
>         }
>
>         writer = createWriter();
>     }
>
>     @Override
>     public long flush() throws IOException {
>         Preconditions.checkNotNull(writer);
>         position += writer.getDataSize();
>         writer.close();
>         writer = createWriter();
>
>         return position;
>     }
>
>     @Override
>     public long getPos() {
>         Preconditions.checkNotNull(writer);
>         return position + writer.getDataSize();
>     }
>
>     @Override
>     public void close() throws IOException {
>         if (writer != null) {
>             writer.close();
>             writer = null;
>         }
>     }
>
>     @Override
>     public void write(T element) throws IOException {
>         Preconditions.checkNotNull(writer);
>         writer.write(element);
>     }
>
>     @Override
>     public Writer<T> duplicate() {
>         return new FlinkProtoParquetWriter<>(protoClass);
>     }
>
>     private ParquetWriter<T> createWriter() throws IOException {
>         return ProtoParquetWriter
>                 .<T>builder(path)
>                 .withPageSize(pageSize)
>                 .withCompressionCodec(compressionCodecName)
>                 .withProtoClass(protoClass)
>                 .build();
>     }
> }
>
>
> Rafi
>
>
>

Reply via email to