Hi Piotr and Kostas, Thanks for your reply.
The issue is that I don't see any committed files, only in-progress. I tried to debug the code for more details. I see that in *BulkPartWriter* I do reach the *write* methods and see events getting written, but I never reach the *closeForCommit*. I reach straight to the *close* function where all parts are disposed. In my job I have a finite stream (source is reading from parquet file/s). Doing some windowed aggregation and writing back to a parquet file. As far as I know, it should commit files during checkpoints and when the stream has finished. I did enabled checkpointing. I did verify that if I connect to other sinks, I see the events. Let me know if I can provide any further information that could be helpful. Would appreciate your help. Thanks, Rafi On Thu, Mar 21, 2019 at 5:20 PM Kostas Kloudas <kklou...@gmail.com> wrote: > Hi Rafi, > > Piotr is correct. In-progress files are not necessarily readable. > The valid files are the ones that are "committed" or finalized. > > Cheers, > Kostas > > On Thu, Mar 21, 2019 at 2:53 PM Piotr Nowojski <pi...@ververica.com> > wrote: > >> Hi, >> >> I’m not sure, but shouldn’t you be just reading committed files and >> ignore in-progress? Maybe Kostas could add more insight to this topic. >> >> Piotr Nowojski >> >> On 20 Mar 2019, at 12:23, Rafi Aroch <rafi.ar...@gmail.com> wrote: >> >> Hi, >> >> I'm trying to stream events in Prorobuf format into a parquet file. >> I looked into both streaming-file options: BucketingSink & >> StreamingFileSink. >> I first tried using the newer *StreamingFileSink* with the *forBulkFormat >> *API. I noticed there's currently support only for the Avro format with >> the *ParquetAvroWriters*. >> I followed the same convention as Avro and wrote a *ParquetProtoWriters* >> builder >> class: >> >> public class ParquetProtoWriters { >> >> private static final int pageSize = 64 * 1024; >> >> public static <T extends Message> ParquetWriterFactory<T> forType(final >> Class<T> protoClass) { >> final ParquetBuilder<T> builder = (out) -> >> createProtoParquetWriter(protoClass, out); >> return new ParquetWriterFactory<>(builder); >> } >> >> private static <T extends Message> ParquetWriter<T> >> createProtoParquetWriter( >> Class<T> type, >> OutputFile out) throws IOException { >> >> return ProtoParquetWriter.<T>builder(out) >> .withPageSize(pageSize) >> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) >> .withCompressionCodec(CompressionCodecName.SNAPPY) >> .withProtoClass(type) >> .build(); >> } >> } >> >> And then I use it as follows: >> >> StreamingFileSink >> .forBulkFormat(new Path("some-path), >> ParquetProtoWriters.forType(SomeProtoType.class)) >> .build(); >> >> I ran tests on the *ParquetProtoWriters *itself and it writes everything >> properly and i'm able to read the files. >> >> When I use the sink as part of a job *I see illegal Parquet files >> created*: >> >> # parquet-tools cat .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea >> .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea is not a Parquet >> file (too small length: 4) >> >> >> Can anyone suggest what am I missing here? >> >> When trying to use the *BucketingSink*, I wrote a Writer class for >> Protobuf and everything worked perfectly: >> >> public class FlinkProtoParquetWriter<T extends MessageOrBuilder> implements >> Writer<T> { >> >> private static final long serialVersionUID = -975302556515811398L; >> >> private Path path; >> private Class<? extends Message> protoClass; >> private transient ParquetWriter<T> writer; >> >> private int position; >> private final CompressionCodecName compressionCodecName = >> CompressionCodecName.SNAPPY; >> private final int pageSize = 64 * 1024; >> >> public FlinkProtoParquetWriter(Class<? extends Message> protoClass) { >> this.protoClass = protoClass; >> } >> >> @Override >> public void open(FileSystem fs, Path path) throws IOException { >> this.position = 0; >> this.path = path; >> >> if (writer != null) { >> writer.close(); >> } >> >> writer = createWriter(); >> } >> >> @Override >> public long flush() throws IOException { >> Preconditions.checkNotNull(writer); >> position += writer.getDataSize(); >> writer.close(); >> writer = createWriter(); >> >> return position; >> } >> >> @Override >> public long getPos() { >> Preconditions.checkNotNull(writer); >> return position + writer.getDataSize(); >> } >> >> @Override >> public void close() throws IOException { >> if (writer != null) { >> writer.close(); >> writer = null; >> } >> } >> >> @Override >> public void write(T element) throws IOException { >> Preconditions.checkNotNull(writer); >> writer.write(element); >> } >> >> @Override >> public Writer<T> duplicate() { >> return new FlinkProtoParquetWriter<>(protoClass); >> } >> >> private ParquetWriter<T> createWriter() throws IOException { >> return ProtoParquetWriter >> .<T>builder(path) >> .withPageSize(pageSize) >> .withCompressionCodec(compressionCodecName) >> .withProtoClass(protoClass) >> .build(); >> } >> } >> >> >> Rafi >> >> >>