Hi, I will try this approach with a prototype and get back.
Thanks, Dev On Thu, Jun 16, 2016 at 10:04 PM, Chandni Singh <[email protected]> wrote: > Dev, > > The FileSystemWalWritter closes the temporary as soon as it gets rotated. > It renames (finalizes) the temporary file to the actual file until the > window is committed. The mapping of temporary file to actual file is > present in the checkpointed state. > > The FileSystemWalReader reads from the temporary file so maybe you can use > that to read the wal. > > Chandni > > > > On Thu, Jun 16, 2016 at 9:55 PM, Devendra Tagare < > [email protected]> > wrote: > > > Hi, > > > > WAL based approach : > > > > The FileSystemWAL.FileSystemWALWriter closes a temporary file only after > > the window is committed.We cannot read any such files till this point. > > Once this file is committed, in the same committed callback the > > ParquetOutputOperator will have to read the committed files, convert the > > spooled records from the WAL to parquet and then write the parquet file. > > A file can only be deleted from the WAL after it has been successfully > > written as a parquet file. > > > > Small files problem : to handle this with a WAL based approach, we will > > have to read files from the WAL till the parquet block size is > reached.This > > will mean the WAL reader is could end up polling files for windows before > > the highest committed window since the block size may not have been > reached > > in the committed callback for a given window. > > > > Fault tolerance : if the parquet writes to the file system fail then the > > operator will go down.In this case we will have to add a retry logic to > > read the files from WAL for the windows which failed. > > > > Please let me know if I am missing something in using the WAL and also if > > using a 2 operator solution would be better suited in this case. > > > > Thanks, > > Dev > > > > > > On Wed, Jun 15, 2016 at 5:02 PM, Thomas Weise <[email protected]> > > wrote: > > > > > Hi Dev, > > > > > > Can you not use the existing WAL implementation (via WindowDataManager > or > > > directly)? > > > > > > Thomas > > > > > > > > > On Wed, Jun 15, 2016 at 3:47 PM, Devendra Tagare < > > > [email protected]> > > > wrote: > > > > > > > Hi, > > > > > > > > Initial thoughts were to go for a WAL based approach where the > operator > > > > would first write POJO's to the WAL and then a separate thread would > do > > > the > > > > task of reading from the WAL and writing the destination files based > on > > > the > > > > block size. > > > > > > > > There is a ticket open for a pluggable spooling implementation with > > > output > > > > operators which can be leveraged for this, > > > > https://issues.apache.org/jira/browse/APEXMALHAR-2037 > > > > > > > > Since work is already being done on that front, we can plug in the > > > spooler > > > > with the existing implementation of the ParquetFileWriter at that > point > > > and > > > > remove the first operator - ParquetFileOutputOperator. > > > > > > > > Thanks, > > > > Dev > > > > > > > > On Tue, Jun 14, 2016 at 7:21 PM, Thomas Weise < > [email protected]> > > > > wrote: > > > > > > > > > What's the reason for not considering the WAL based approach? > > > > > > > > > > What are the pros and cons? > > > > > > > > > > > > > > > On Tue, Jun 14, 2016 at 6:54 PM, Devendra Tagare < > > > > > [email protected]> > > > > > wrote: > > > > > > > > > > > Hi All, > > > > > > > > > > > > We can focus on the below 2 problems, > > > > > > 1.Avoid the small files problem which could arise due a flush at > > > every > > > > > > endWindow, since there wouldn't be significant data in a window. > > > > > > 2.Fault Tolerance. > > > > > > > > > > > > *Proposal* : Create a module in which there are 2 operators, > > > > > > > > > > > > *Operator 1 : ParquetFileOutputOperator* > > > > > > This operator will be an implementation of the > > > > > AbstractFileOutputOperator. > > > > > > It will write data to a HDFS location and leverage the > > > fault-tolerance > > > > > > semantics of the AbstractFileOutputOperator. > > > > > > > > > > > > This operator will implement the CheckpointNotificationListener > and > > > > will > > > > > > emit the finalizedFiles from the beforeCheckpoint method. > > > > > > Map<windowId,Set<files finalized in the window>> > > > > > > > > > > > > *Operator 2 : ParquetFileWriter* > > > > > > This operator will receive a Set<files finalized in the window> > > from > > > > the > > > > > > ParquetFileOutputOperator on its input port. > > > > > > Once it receives this map, it will do the below things, > > > > > > > > > > > > 1.Save the input received to a Map<windowId,Set<InputFiles>> > > > > > inputFilesMap > > > > > > > > > > > > 2.Instantiate a new ParquetWriter > > > > > > 2.a. Get a unique file name. > > > > > > 2.b. Add a configurable writer that extends the ParquetWriter > and > > > > > include > > > > > > a write support for writing various supported formats like > > > Avro,thrift > > > > > etc. > > > > > > > > > > > > 3.For each file from the inputFilesMap, > > > > > > 3.a Read the file and write the record using the writer created > > in > > > > (2) > > > > > > 3.b Check if the block size (configurable) is reached.If yes > then > > > > close > > > > > > the file and add its entry to a > > > > > > Map<windowId,CompletedFiles>completedFilesMap.Remove the entry > from > > > > > > inputFilesMap. > > > > > > If the writes fail then the files can be reprocessed from > > the > > > > > > inputFilesMap. > > > > > > 3.c In the committed callback remove the completed files from the > > > > > directory > > > > > > and prune the completedFilesMap for that window. > > > > > > > > > > > > Points to note, > > > > > > 1.The block size check will be approximate since the data is in > > > memory > > > > > and > > > > > > ParquetWriter does not expose a flush. > > > > > > 2.This is at best a temporary implementation in the absence of a > > WAL > > > > > based > > > > > > approach. > > > > > > > > > > > > I would like to take a crack at this operator based on community > > > > > feedback. > > > > > > > > > > > > Thoughts ? > > > > > > > > > > > > Thanks, > > > > > > Dev > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Apr 25, 2016 at 12:36 PM, Tushar Gosavi < > > > > [email protected]> > > > > > > wrote: > > > > > > > > > > > > > Hi Shubham, > > > > > > > > > > > > > > +1 for the Parquet writer. > > > > > > > > > > > > > > I doubt if we could leverage on recovery mechanism provided by > > > > > > > AbstractFileOutputOperator as Parquet Writer does not expose > > flush, > > > > and > > > > > > > could write to underline stream at any time. To simplify > recovery > > > you > > > > > can > > > > > > > write a single file in each checkpoint duration. If this is not > > an > > > > > > option, > > > > > > > then > > > > > > > you need to make use of WAL for recovery, and not use operator > > > > > > > check-pointing for storing not persisted tuples, as > checkpointing > > > > huge > > > > > > > state every 30 seconds is costly. > > > > > > > > > > > > > > Regards, > > > > > > > -Tushar. > > > > > > > > > > > > > > > > > > > > > On Mon, Apr 25, 2016 at 11:38 PM, Shubham Pathak < > > > > > > [email protected]> > > > > > > > wrote: > > > > > > > > > > > > > > > Hello Community, > > > > > > > > > > > > > > > > Apache Parquet < > > https://parquet.apache.org/documentation/latest/ > > > > > > > > > is a > > > > > > > > columnar oriented binary file format designed to be extremely > > > > > efficient > > > > > > > and > > > > > > > > interoperable across Hadoop ecosystem. It has integrations > with > > > > most > > > > > of > > > > > > > the > > > > > > > > Hadoop processing frameworks ( Impala, Hive, Pig, Spark.. ) > and > > > > > > > > serialization models (Thrift, Avro, Protobuf) making it easy > > to > > > > use > > > > > in > > > > > > > ETL > > > > > > > > and processing pipelines. > > > > > > > > > > > > > > > > Having an operator to write data to Parquet files would > > certainly > > > > be > > > > > a > > > > > > > good > > > > > > > > addition to the Malhar library. > > > > > > > > > > > > > > > > The underlying implementation > > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > http://blog.cloudera.com/blog/2014/05/how-to-convert-existing-data-into-parquet/ > > > > > > > > > > > > > > > > > for writing data as Parquet, requires a subclass of > > > > > > > > parquet.hadoop.api.WriteSupport that knows how to take an > > > in-memory > > > > > > > object > > > > > > > > and write Parquet primitives through > > > > parquet.io.api.RecordConsumer*.* > > > > > > > > Currently, there are several WriteSupport implementations, > > > > including > > > > > > > > ThriftWriteSupport, > > > > > > > > AvroWriteSupport, and ProtoWriteSupport. > > > > > > > > These WriteSupport implementations are then wrapped as > > > > ParquetWriter > > > > > > > > objects for writing. > > > > > > > > > > > > > > > > Parquet Writers do not expose a handle to the underlying > > stream. > > > In > > > > > > order > > > > > > > > to write data to a Parquet file, all the records ( that > belong > > > to > > > > > > file ) > > > > > > > > must be buffered in memory. These records are then compressed > > and > > > > > later > > > > > > > > flushed to the file. > > > > > > > > > > > > > > > > To start with, we could support following features in the > > > operator > > > > > > > > > > > > > > > > - *Ability to provide a WriteSupport Implementation* : The > > > user > > > > > > should > > > > > > > > be able to use existing implementations of > > parquet.hadoop.api. > > > > > > > > WriteSupport or provide his/her own implementation. > > > > > > > > - *Ability to configure Page Size : *Refers to the amount > of > > > > > > > > uncompressed data for a single column that is read before > it > > > is > > > > > > > > compressed > > > > > > > > as a unit and buffered in memory to be written out as a > > > “page”. > > > > > > > Default > > > > > > > > value : 1MB > > > > > > > > - *Ability to configure Parquet Block Size : *Refers to > the > > > > amount > > > > > > of > > > > > > > > compressed data that should be buffered in memory before a > > row > > > > > group > > > > > > > is > > > > > > > > written out to disk. Larger block sizes require more > memory > > to > > > > > > buffer > > > > > > > > the > > > > > > > > data; Recommended is 128 MB / 256 MB > > > > > > > > - *Flushing files periodically* :Operator would have to > > flush > > > > > files > > > > > > > > periodically in a specified directory as per configured > > block > > > > > size . > > > > > > > > This > > > > > > > > could be time-based / number of events based / size based > > > > > > > > > > > > > > > > To implement the operator, here's one approach : > > > > > > > > > > > > > > > > 1. Extend existing AbstractFileOutputOperator > > > > > > > > 2. Provide methods to add write support implementations. > > > > > > > > 3. In process method, hold the data in memory till we > reach > > a > > > > > > > configured > > > > > > > > size and then flush the contents to a file during > > > endWindow(). > > > > > > > > > > > > > > > > Please send across your thoughts on this. I would also like > to > > > know > > > > > if > > > > > > we > > > > > > > > would be able to leverage recovery mechanisms provided by > > > > > > > > AbstractFileOutputOperator using this approach? > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Shubham > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
