Hi Rafi, Have you enabled checkpointing for you job?
Cheers, Kostas On Thu, Mar 21, 2019 at 5:18 PM Rafi Aroch <rafi.ar...@gmail.com> wrote: > Hi Piotr and Kostas, > > Thanks for your reply. > > The issue is that I don't see any committed files, only in-progress. > I tried to debug the code for more details. I see that in *BulkPartWriter* I > do reach the *write* methods and see events getting written, but I never > reach the *closeForCommit*. I reach straight to the *close* function > where all parts are disposed. > > In my job I have a finite stream (source is reading from parquet file/s). > Doing some windowed aggregation and writing back to a parquet file. > As far as I know, it should commit files during checkpoints and when the > stream has finished. I did enabled checkpointing. > I did verify that if I connect to other sinks, I see the events. > > Let me know if I can provide any further information that could be helpful. > > Would appreciate your help. > > Thanks, > Rafi > > > On Thu, Mar 21, 2019 at 5:20 PM Kostas Kloudas <kklou...@gmail.com> wrote: > >> Hi Rafi, >> >> Piotr is correct. In-progress files are not necessarily readable. >> The valid files are the ones that are "committed" or finalized. >> >> Cheers, >> Kostas >> >> On Thu, Mar 21, 2019 at 2:53 PM Piotr Nowojski <pi...@ververica.com> >> wrote: >> >>> Hi, >>> >>> I’m not sure, but shouldn’t you be just reading committed files and >>> ignore in-progress? Maybe Kostas could add more insight to this topic. >>> >>> Piotr Nowojski >>> >>> On 20 Mar 2019, at 12:23, Rafi Aroch <rafi.ar...@gmail.com> wrote: >>> >>> Hi, >>> >>> I'm trying to stream events in Prorobuf format into a parquet file. >>> I looked into both streaming-file options: BucketingSink & >>> StreamingFileSink. >>> I first tried using the newer *StreamingFileSink* with the *forBulkFormat >>> *API. I noticed there's currently support only for the Avro format with >>> the *ParquetAvroWriters*. >>> I followed the same convention as Avro and wrote a *ParquetProtoWriters* >>> builder >>> class: >>> >>> public class ParquetProtoWriters { >>> >>> private static final int pageSize = 64 * 1024; >>> >>> public static <T extends Message> ParquetWriterFactory<T> forType(final >>> Class<T> protoClass) { >>> final ParquetBuilder<T> builder = (out) -> >>> createProtoParquetWriter(protoClass, out); >>> return new ParquetWriterFactory<>(builder); >>> } >>> >>> private static <T extends Message> ParquetWriter<T> >>> createProtoParquetWriter( >>> Class<T> type, >>> OutputFile out) throws IOException { >>> >>> return ProtoParquetWriter.<T>builder(out) >>> .withPageSize(pageSize) >>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) >>> .withCompressionCodec(CompressionCodecName.SNAPPY) >>> .withProtoClass(type) >>> .build(); >>> } >>> } >>> >>> And then I use it as follows: >>> >>> StreamingFileSink >>> .forBulkFormat(new Path("some-path), >>> ParquetProtoWriters.forType(SomeProtoType.class)) >>> .build(); >>> >>> I ran tests on the *ParquetProtoWriters *itself and it writes >>> everything properly and i'm able to read the files. >>> >>> When I use the sink as part of a job *I see illegal Parquet files >>> created*: >>> >>> # parquet-tools cat >>> .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea >>> .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea is not a Parquet >>> file (too small length: 4) >>> >>> >>> Can anyone suggest what am I missing here? >>> >>> When trying to use the *BucketingSink*, I wrote a Writer class for >>> Protobuf and everything worked perfectly: >>> >>> public class FlinkProtoParquetWriter<T extends MessageOrBuilder> implements >>> Writer<T> { >>> >>> private static final long serialVersionUID = -975302556515811398L; >>> >>> private Path path; >>> private Class<? extends Message> protoClass; >>> private transient ParquetWriter<T> writer; >>> >>> private int position; >>> private final CompressionCodecName compressionCodecName = >>> CompressionCodecName.SNAPPY; >>> private final int pageSize = 64 * 1024; >>> >>> public FlinkProtoParquetWriter(Class<? extends Message> protoClass) { >>> this.protoClass = protoClass; >>> } >>> >>> @Override >>> public void open(FileSystem fs, Path path) throws IOException { >>> this.position = 0; >>> this.path = path; >>> >>> if (writer != null) { >>> writer.close(); >>> } >>> >>> writer = createWriter(); >>> } >>> >>> @Override >>> public long flush() throws IOException { >>> Preconditions.checkNotNull(writer); >>> position += writer.getDataSize(); >>> writer.close(); >>> writer = createWriter(); >>> >>> return position; >>> } >>> >>> @Override >>> public long getPos() { >>> Preconditions.checkNotNull(writer); >>> return position + writer.getDataSize(); >>> } >>> >>> @Override >>> public void close() throws IOException { >>> if (writer != null) { >>> writer.close(); >>> writer = null; >>> } >>> } >>> >>> @Override >>> public void write(T element) throws IOException { >>> Preconditions.checkNotNull(writer); >>> writer.write(element); >>> } >>> >>> @Override >>> public Writer<T> duplicate() { >>> return new FlinkProtoParquetWriter<>(protoClass); >>> } >>> >>> private ParquetWriter<T> createWriter() throws IOException { >>> return ProtoParquetWriter >>> .<T>builder(path) >>> .withPageSize(pageSize) >>> .withCompressionCodec(compressionCodecName) >>> .withProtoClass(protoClass) >>> .build(); >>> } >>> } >>> >>> >>> Rafi >>> >>> >>>