What's the reason for not considering the WAL based approach?

What are the pros and cons?


On Tue, Jun 14, 2016 at 6:54 PM, Devendra Tagare <[email protected]>
wrote:

> Hi All,
>
> We can focus on the below 2 problems,
> 1.Avoid the small files problem which could arise due a flush at every
> endWindow, since there wouldn't be significant data in a window.
> 2.Fault Tolerance.
>
> *Proposal* : Create a module in which there are 2 operators,
>
> *Operator 1 : ParquetFileOutputOperator*
> This operator will be an implementation of the AbstractFileOutputOperator.
> It will write data to a HDFS location and leverage the fault-tolerance
> semantics of the AbstractFileOutputOperator.
>
> This operator will implement the CheckpointNotificationListener and will
> emit the finalizedFiles from the beforeCheckpoint method.
> Map<windowId,Set<files finalized in the window>>
>
> *Operator 2 : ParquetFileWriter*
> This operator will receive a Set<files finalized in the window> from the
> ParquetFileOutputOperator on its input port.
> Once it receives this map, it will do the below things,
>
> 1.Save the input received to a Map<windowId,Set<InputFiles>> inputFilesMap
>
> 2.Instantiate a new ParquetWriter
>   2.a. Get a unique file name.
>   2.b. Add a configurable writer that extends the ParquetWriter and include
> a write support for writing various supported formats like Avro,thrift etc.
>
> 3.For each file from the inputFilesMap,
>   3.a Read the file and write the record using the writer created in (2)
>   3.b Check if the block size (configurable) is reached.If yes then close
> the file and add its entry to a
> Map<windowId,CompletedFiles>completedFilesMap.Remove the entry from
> inputFilesMap.
>         If the writes fail then the files can be reprocessed from the
> inputFilesMap.
> 3.c In the committed callback remove the completed files from the directory
> and prune the completedFilesMap for that window.
>
> Points to note,
> 1.The block size check will be approximate since the data is in memory and
> ParquetWriter does not expose a flush.
> 2.This is at best a temporary implementation in the absence of a WAL based
> approach.
>
> I would like to take a crack at this operator based on community feedback.
>
> Thoughts ?
>
> Thanks,
> Dev
>
>
>
>
>
>
>
>
>
>
>
>
> On Mon, Apr 25, 2016 at 12:36 PM, Tushar Gosavi <[email protected]>
> wrote:
>
> > Hi Shubham,
> >
> > +1 for the Parquet  writer.
> >
> > I doubt if we could leverage on recovery mechanism provided by
> > AbstractFileOutputOperator as Parquet Writer does not expose flush, and
> > could write to underline stream at any time. To simplify recovery you can
> > write a single file in each checkpoint duration. If this is not an
> option,
> > then
> > you need to make use of WAL for recovery, and not use operator
> > check-pointing for storing not persisted tuples, as checkpointing huge
> > state every 30 seconds is costly.
> >
> > Regards,
> > -Tushar.
> >
> >
> > On Mon, Apr 25, 2016 at 11:38 PM, Shubham Pathak <
> [email protected]>
> > wrote:
> >
> > > Hello Community,
> > >
> > > Apache Parquet <https://parquet.apache.org/documentation/latest/> is a
> > > columnar oriented binary file format designed to be extremely efficient
> > and
> > > interoperable across Hadoop ecosystem. It has integrations with most of
> > the
> > > Hadoop processing frameworks ( Impala, Hive, Pig, Spark.. ) and
> > > serialization models (Thrift, Avro, Protobuf)  making it easy to use in
> > ETL
> > > and processing pipelines.
> > >
> > > Having an operator to write data to Parquet files would certainly be a
> > good
> > > addition to the Malhar library.
> > >
> > > The underlying implementation
> > > <
> > >
> >
> http://blog.cloudera.com/blog/2014/05/how-to-convert-existing-data-into-parquet/
> > > >
> > > for writing data as Parquet, requires a subclass of
> > > parquet.hadoop.api.WriteSupport that knows how to take an in-memory
> > object
> > > and write Parquet primitives through parquet.io.api.RecordConsumer*.*
> > > Currently, there are several WriteSupport implementations, including
> > > ThriftWriteSupport,
> > > AvroWriteSupport, and ProtoWriteSupport.
> > > These WriteSupport implementations are then wrapped as ParquetWriter
> > > objects for writing.
> > >
> > > Parquet Writers do not expose a handle to the underlying stream. In
> order
> > > to  write data to a Parquet file, all the records ( that belong to
> file )
> > > must be buffered in memory. These records are then compressed and later
> > > flushed to the file.
> > >
> > > To start with, we could support following features in the operator
> > >
> > >    - *Ability to provide a WriteSupport Implementation* : The user
> should
> > >    be able to use existing implementations of parquet.hadoop.api.
> > >    WriteSupport or provide his/her own implementation.
> > >    - *Ability to configure Page Size : *Refers to the amount of
> > >    uncompressed data for a single column that is read before it is
> > > compressed
> > >    as a unit and buffered in memory to be written out as a “page”.
> > Default
> > >    value : 1MB
> > >    - *Ability to configure Parquet Block Size : *Refers to the amount
> of
> > >    compressed data that should be buffered in memory before a row group
> > is
> > >    written out to disk. Larger block sizes require more memory to
> buffer
> > > the
> > >    data; Recommended is 128 MB / 256 MB
> > >    - *Flushing files periodically* :Operator would have to flush files
> > >    periodically in a specified directory as per configured block size .
> > > This
> > >    could be time-based / number of events based  / size based
> > >
> > > To implement the operator, here's one approach  :
> > >
> > >    1. Extend existing AbstractFileOutputOperator
> > >    2.  Provide methods to add write support implementations.
> > >    3. In process method, hold the data in memory till we reach a
> > configured
> > >    size and then flush  the contents to a file during endWindow().
> > >
> > > Please send across your thoughts on this. I would also like to know if
> we
> > > would be able to leverage recovery mechanisms provided by
> > > AbstractFileOutputOperator using this approach?
> > >
> > >
> > > Thanks,
> > > Shubham
> > >
> >
>

Reply via email to