What's the reason for not considering the WAL based approach? What are the pros and cons?
On Tue, Jun 14, 2016 at 6:54 PM, Devendra Tagare <[email protected]> wrote: > Hi All, > > We can focus on the below 2 problems, > 1.Avoid the small files problem which could arise due a flush at every > endWindow, since there wouldn't be significant data in a window. > 2.Fault Tolerance. > > *Proposal* : Create a module in which there are 2 operators, > > *Operator 1 : ParquetFileOutputOperator* > This operator will be an implementation of the AbstractFileOutputOperator. > It will write data to a HDFS location and leverage the fault-tolerance > semantics of the AbstractFileOutputOperator. > > This operator will implement the CheckpointNotificationListener and will > emit the finalizedFiles from the beforeCheckpoint method. > Map<windowId,Set<files finalized in the window>> > > *Operator 2 : ParquetFileWriter* > This operator will receive a Set<files finalized in the window> from the > ParquetFileOutputOperator on its input port. > Once it receives this map, it will do the below things, > > 1.Save the input received to a Map<windowId,Set<InputFiles>> inputFilesMap > > 2.Instantiate a new ParquetWriter > 2.a. Get a unique file name. > 2.b. Add a configurable writer that extends the ParquetWriter and include > a write support for writing various supported formats like Avro,thrift etc. > > 3.For each file from the inputFilesMap, > 3.a Read the file and write the record using the writer created in (2) > 3.b Check if the block size (configurable) is reached.If yes then close > the file and add its entry to a > Map<windowId,CompletedFiles>completedFilesMap.Remove the entry from > inputFilesMap. > If the writes fail then the files can be reprocessed from the > inputFilesMap. > 3.c In the committed callback remove the completed files from the directory > and prune the completedFilesMap for that window. > > Points to note, > 1.The block size check will be approximate since the data is in memory and > ParquetWriter does not expose a flush. > 2.This is at best a temporary implementation in the absence of a WAL based > approach. > > I would like to take a crack at this operator based on community feedback. > > Thoughts ? > > Thanks, > Dev > > > > > > > > > > > > > On Mon, Apr 25, 2016 at 12:36 PM, Tushar Gosavi <[email protected]> > wrote: > > > Hi Shubham, > > > > +1 for the Parquet writer. > > > > I doubt if we could leverage on recovery mechanism provided by > > AbstractFileOutputOperator as Parquet Writer does not expose flush, and > > could write to underline stream at any time. To simplify recovery you can > > write a single file in each checkpoint duration. If this is not an > option, > > then > > you need to make use of WAL for recovery, and not use operator > > check-pointing for storing not persisted tuples, as checkpointing huge > > state every 30 seconds is costly. > > > > Regards, > > -Tushar. > > > > > > On Mon, Apr 25, 2016 at 11:38 PM, Shubham Pathak < > [email protected]> > > wrote: > > > > > Hello Community, > > > > > > Apache Parquet <https://parquet.apache.org/documentation/latest/> is a > > > columnar oriented binary file format designed to be extremely efficient > > and > > > interoperable across Hadoop ecosystem. It has integrations with most of > > the > > > Hadoop processing frameworks ( Impala, Hive, Pig, Spark.. ) and > > > serialization models (Thrift, Avro, Protobuf) making it easy to use in > > ETL > > > and processing pipelines. > > > > > > Having an operator to write data to Parquet files would certainly be a > > good > > > addition to the Malhar library. > > > > > > The underlying implementation > > > < > > > > > > http://blog.cloudera.com/blog/2014/05/how-to-convert-existing-data-into-parquet/ > > > > > > > for writing data as Parquet, requires a subclass of > > > parquet.hadoop.api.WriteSupport that knows how to take an in-memory > > object > > > and write Parquet primitives through parquet.io.api.RecordConsumer*.* > > > Currently, there are several WriteSupport implementations, including > > > ThriftWriteSupport, > > > AvroWriteSupport, and ProtoWriteSupport. > > > These WriteSupport implementations are then wrapped as ParquetWriter > > > objects for writing. > > > > > > Parquet Writers do not expose a handle to the underlying stream. In > order > > > to write data to a Parquet file, all the records ( that belong to > file ) > > > must be buffered in memory. These records are then compressed and later > > > flushed to the file. > > > > > > To start with, we could support following features in the operator > > > > > > - *Ability to provide a WriteSupport Implementation* : The user > should > > > be able to use existing implementations of parquet.hadoop.api. > > > WriteSupport or provide his/her own implementation. > > > - *Ability to configure Page Size : *Refers to the amount of > > > uncompressed data for a single column that is read before it is > > > compressed > > > as a unit and buffered in memory to be written out as a “page”. > > Default > > > value : 1MB > > > - *Ability to configure Parquet Block Size : *Refers to the amount > of > > > compressed data that should be buffered in memory before a row group > > is > > > written out to disk. Larger block sizes require more memory to > buffer > > > the > > > data; Recommended is 128 MB / 256 MB > > > - *Flushing files periodically* :Operator would have to flush files > > > periodically in a specified directory as per configured block size . > > > This > > > could be time-based / number of events based / size based > > > > > > To implement the operator, here's one approach : > > > > > > 1. Extend existing AbstractFileOutputOperator > > > 2. Provide methods to add write support implementations. > > > 3. In process method, hold the data in memory till we reach a > > configured > > > size and then flush the contents to a file during endWindow(). > > > > > > Please send across your thoughts on this. I would also like to know if > we > > > would be able to leverage recovery mechanisms provided by > > > AbstractFileOutputOperator using this approach? > > > > > > > > > Thanks, > > > Shubham > > > > > >
