Hi Ananth, Nice writeup, couple questions/comments inline ->
On Tue, Aug 15, 2017 at 2:02 PM, Ananth G <ananthg.a...@gmail.com> wrote: > Hello All, > > The implementation for Apex Kudu Input Operator is ready for a pull > request. Before raising the pull request, I would like to get any inputs > regarding the design and incorporate any feedback before raising the pull > request in the next couple of days for the following JIRA. > > https://issues.apache.org/jira/browse/APEXMALHAR-2472 < > https://issues.apache.org/jira/browse/APEXMALHAR-2472> > > The following are the main features that would be supported by the Input > operator: > > - The input operator would be used to scan all or some rows of a single > kudu table. > - Each Kudu row is translated to a POJO for downstream operators. > - The Input operator would accept an SQL expression ( described in detail > below) that would be parsed to generate the equivalent scanner code for the > Kudu Table. This is because Kudu Table API does not support an SQL > expressions > - The SQL expression would have additional options that would help in > Apache Apex design patterns ( Ex: Sending a control tuple message after a > query is successfully processed ) > - The Input operator works on a continuous basis i.e. it would accept the > next query once the current query is complete) > This means the operator will repeat the query to fetch newly added rows, similar to what the JDBC poll operator does, correct? - The operator will work in a distributed fashion for the input query. This > essentially means for a single input query, the scan work is distributed > among all of the physical instances of the input operator. > - Kudu splits a table into chunks of data regions called Tablets. The > tablets are replicated and partitioned (range and hash partitions are > supported ) in Kudu according to the Kudu Table definition. The operator > allows partitioning of the Input Operator to be done in 2 ways. > - Map many Kudu Tablets to one partition of the Apex Kudu Input > operator > - One Kudu Tablet maps to one partition of the Apex Kudu Input > operator > - The partitioning does not change on a per query basis. This is because > of the complex use cases that would arise. For example, if the query is > touching only a few rows before the next query is accepted, it would result > in a lot of churn in terms of operator serialize/deserialze, YARN > allocation requests etc. Also supporting per query partition planning leads > to possibly very complex implementation and poor resource usage as all > physical instances of the operator have to wait for its peers to complete > its scan and wait for next checkpoint to get repartitioned. > Agreed, what would be a reason to change partitioning between queries though? > - The partitioner splits the work load of a single query in a round robin > fashion. After a query plan is generated , each scan token range is > distributed equally among the physical operator instances. > - The operator allows for two modes of scanning for an application ( > Cannot be changed on a per query basis ) > - Consistent Order scanner - only one tablet scan thread is active > at any given instance of time for a given query > - Random Order scanner - Many threads are active to scan Kudu > tablets in parallel > - As can be seen, Consistent order scanner would be slower but would help > in better “exactly once” implementations if the correct method is > overridden in the operator. > Can you elaborate on this a bit more? Ordering within a streaming window generally isn't deterministic when you have a shuffle or stream merge. And the association between input records and streaming windows can be made deterministic by using the window data manager? > - The operator introduces the DisruptorBlockingQueue for a low latency > buffer management. LMAX disruptor library was considered and based on some > other discussion threads on other Apache projects, settled on the > ConversantMedia implementation of the Disruptor Blocking queue. This > blocking queue is used when the kudu scanner thread wants to send the > scanned row into the input operators main thread emitTuples() call. > - The operator allows for exactly once semantics if the user specifies the > logic for reconciling a possible duplicate row in situations when the > operator is resuming from a checkpoint. This is done by overriding a method > that returns a boolean ( true to emit the tuple and false to suppress the > tuple ) when the operating is working in the reconciling window phase. As > can be seen, this reconciling phase is only active at the max for one > window. - The operator uses the FSWindowManager to manage metadata at the end of > every window. From resumption at a checkpoint, the operator will still scan > the Kudu tablets but simply not emit all rows that were already streamed > downstream. Subsequently when the operator is in the reconciling window, > the method described above is invoked to allow for duplicates filter. After > this reconciling window, the operator works in the normal mode of operation. > In which situation would I use this? How do I know the record was actually processed downstream? > - The following are the additional configurable aspects of the operator > - Max tuples per window > - Spin policy and the buffer size for the Disruptor Blocking Queue > - Mechanism to provide custom control tuples if required > - Setting the number of Physical operator instances via the API if > required. > - Setting the fault Tolerance. If fault tolerant , an alternative > replica of the Kudu tablet is picked up for scanning if the initial tablet > fails for whatever reason. However this slows down the scan throughput. > Hence it is configurable by the end user. > > > Some notes regarding the SQL expression for the operator: > > - The operator uses ANTLR4 to parse the SQL expression. > - The parser is based on a grammar file which is part of the source tree. > The grammar is compiled on every build as part of the build process and > code is generated for the parser automatically. > - The reason we had to use a custom parser are (as opposed to something > like calcite) : > - Kudu does not have all the features for a standard SQL > expression. As an example != ( not equal to ) is not supported. Nor is > there a concept of a Join etc. > - We are providing a lot more flexibility for the user to specify > what the control tuple message should be should the end user choose to send > a control tuple downstream after the given query is done processing > - The SQL expression can specify a set of options for processing of the > query: > - Control tuple message : A message/string that can be sent as the > Control tuple field. There would be other parts for this control tuple like > the query that was just completed and whether this is a begin or end of the > scan. > Will there be any support for watermarks that would work with downstream window operator? Another future idea might be to support Kudu as a source in the Apex SQL API. > - Read Snapshot time : Kudu supports specifying the read snapshot > time for which the scan has to occur. This is because Kudu is essentially > an MVCC engine and stores multiple versions of the same row. The Read > snapshot time allows for the end user to specify the read snapshot time for > the scan. > - The parser supports for general syntax checking. If there is an error in > the SQL expression , the string representing the SQL expression supplied is > emitted onto an error port and the next query is taken for processing. > - The data types supported are only those data types as supported by the > Kudu Engine. The parser supports data type parsing support. For example > String data types are double quoted etc. > - The Parser allows for a SELECT AA as BB style of expressions wherein AA > is the column name in Kudu and BB is the name of the java POJO field name. > > Please let me know if the community has any other questions regarding the > above design. I am planning to present this operator along with the Kudu > output operator in the Data works summit next month and any feedback would > be useful. > > > Regards, > Ananth