Hi Rafi, Piotr is correct. In-progress files are not necessarily readable. The valid files are the ones that are "committed" or finalized.
Cheers, Kostas On Thu, Mar 21, 2019 at 2:53 PM Piotr Nowojski <pi...@ververica.com> wrote: > Hi, > > I’m not sure, but shouldn’t you be just reading committed files and ignore > in-progress? Maybe Kostas could add more insight to this topic. > > Piotr Nowojski > > On 20 Mar 2019, at 12:23, Rafi Aroch <rafi.ar...@gmail.com> wrote: > > Hi, > > I'm trying to stream events in Prorobuf format into a parquet file. > I looked into both streaming-file options: BucketingSink & > StreamingFileSink. > I first tried using the newer *StreamingFileSink* with the *forBulkFormat > *API. I noticed there's currently support only for the Avro format with > the *ParquetAvroWriters*. > I followed the same convention as Avro and wrote a *ParquetProtoWriters* > builder > class: > > public class ParquetProtoWriters { > > private static final int pageSize = 64 * 1024; > > public static <T extends Message> ParquetWriterFactory<T> forType(final > Class<T> protoClass) { > final ParquetBuilder<T> builder = (out) -> > createProtoParquetWriter(protoClass, out); > return new ParquetWriterFactory<>(builder); > } > > private static <T extends Message> ParquetWriter<T> > createProtoParquetWriter( > Class<T> type, > OutputFile out) throws IOException { > > return ProtoParquetWriter.<T>builder(out) > .withPageSize(pageSize) > .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) > .withCompressionCodec(CompressionCodecName.SNAPPY) > .withProtoClass(type) > .build(); > } > } > > And then I use it as follows: > > StreamingFileSink > .forBulkFormat(new Path("some-path), > ParquetProtoWriters.forType(SomeProtoType.class)) > .build(); > > I ran tests on the *ParquetProtoWriters *itself and it writes everything > properly and i'm able to read the files. > > When I use the sink as part of a job *I see illegal Parquet files created* > : > > # parquet-tools cat .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea > .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea is not a Parquet > file (too small length: 4) > > > Can anyone suggest what am I missing here? > > When trying to use the *BucketingSink*, I wrote a Writer class for > Protobuf and everything worked perfectly: > > public class FlinkProtoParquetWriter<T extends MessageOrBuilder> implements > Writer<T> { > > private static final long serialVersionUID = -975302556515811398L; > > private Path path; > private Class<? extends Message> protoClass; > private transient ParquetWriter<T> writer; > > private int position; > private final CompressionCodecName compressionCodecName = > CompressionCodecName.SNAPPY; > private final int pageSize = 64 * 1024; > > public FlinkProtoParquetWriter(Class<? extends Message> protoClass) { > this.protoClass = protoClass; > } > > @Override > public void open(FileSystem fs, Path path) throws IOException { > this.position = 0; > this.path = path; > > if (writer != null) { > writer.close(); > } > > writer = createWriter(); > } > > @Override > public long flush() throws IOException { > Preconditions.checkNotNull(writer); > position += writer.getDataSize(); > writer.close(); > writer = createWriter(); > > return position; > } > > @Override > public long getPos() { > Preconditions.checkNotNull(writer); > return position + writer.getDataSize(); > } > > @Override > public void close() throws IOException { > if (writer != null) { > writer.close(); > writer = null; > } > } > > @Override > public void write(T element) throws IOException { > Preconditions.checkNotNull(writer); > writer.write(element); > } > > @Override > public Writer<T> duplicate() { > return new FlinkProtoParquetWriter<>(protoClass); > } > > private ParquetWriter<T> createWriter() throws IOException { > return ProtoParquetWriter > .<T>builder(path) > .withPageSize(pageSize) > .withCompressionCodec(compressionCodecName) > .withProtoClass(protoClass) > .build(); > } > } > > > Rafi > > >