Hi Ananth,

Nice writeup, couple questions/comments inline ->

On Tue, Aug 15, 2017 at 2:02 PM, Ananth G <ananthg.a...@gmail.com> wrote:

> Hello All,
>
> The implementation for Apex Kudu Input Operator is ready for a pull
> request. Before raising the pull request, I would like to get any inputs
> regarding the design and incorporate any feedback before raising the pull
> request in the next couple of days for the following JIRA.
>
> https://issues.apache.org/jira/browse/APEXMALHAR-2472 <
> https://issues.apache.org/jira/browse/APEXMALHAR-2472>
>
> The following are the main features that would be supported by the Input
> operator:
>
> - The input operator would be used to scan all or some rows of a single
> kudu table.
> - Each Kudu row is translated to a POJO for downstream operators.
> - The Input operator would accept an SQL expression ( described in detail
> below) that would be parsed to generate the equivalent scanner code for the
> Kudu Table. This is because Kudu Table API does not support an SQL
> expressions
> - The SQL expression would have additional options that would help in
> Apache Apex design patterns ( Ex: Sending a control tuple message after a
> query is successfully processed )
> - The Input operator works on a continuous basis i.e. it would accept the
> next query once the current query is complete)
>

This means the operator will repeat the query to fetch newly added rows,
similar to what the JDBC poll operator does, correct?

- The operator will work in a distributed fashion for the input query. This
> essentially means for a single input query, the scan work is distributed
> among all of the physical instances of the input operator.
> - Kudu splits a table into chunks of data regions called Tablets. The
> tablets are replicated and partitioned  (range and hash partitions are
> supported ) in Kudu according to the Kudu Table definition. The operator
> allows partitioning of the Input Operator to be done in 2 ways.
>         - Map many Kudu Tablets to one partition of the Apex Kudu Input
> operator
>         - One Kudu Tablet maps to one partition of the Apex Kudu Input
> operator
> - The partitioning does not change on a per query basis. This is because
> of the complex use cases that would arise. For example, if the query is
> touching only a few rows before the next query is accepted, it would result
> in a lot of churn in terms of operator serialize/deserialze, YARN
> allocation requests etc. Also supporting per query partition planning leads
> to possibly very complex implementation and poor resource usage as all
> physical instances of the operator have to wait for its peers to complete
> its scan and wait for next checkpoint to get repartitioned.
>

Agreed, what would be a reason to change partitioning between queries
though?


> - The partitioner splits the work load of a single query in a round robin
> fashion. After a query plan is generated , each scan token range is
> distributed equally among the physical operator instances.
> - The operator allows for two modes of scanning for an application (
> Cannot be changed on a per query basis )
>         - Consistent Order scanner - only one tablet scan thread is active
> at any given instance of time for a given query
>         - Random Order scanner - Many threads are active to scan Kudu
> tablets in parallel
> - As can be seen, Consistent order scanner would be slower but would help
> in better “exactly once” implementations if the correct method is
> overridden in the operator.
>

Can you elaborate on this a bit more? Ordering within a streaming window
generally isn't deterministic when you have a shuffle or stream merge. And
the association between input records and streaming windows can be made
deterministic by using the window data manager?


> - The operator introduces the DisruptorBlockingQueue for a low latency
> buffer management. LMAX disruptor library was considered and based on some
> other discussion threads on other Apache projects, settled on the
> ConversantMedia implementation of the Disruptor Blocking queue. This
> blocking queue is used when the kudu scanner thread wants to send the
> scanned row into the input operators main thread emitTuples() call.
> - The operator allows for exactly once semantics if the user specifies the
> logic for reconciling a possible duplicate row in situations when the
> operator is resuming from a checkpoint. This is done by overriding a method
> that returns a boolean ( true to emit the tuple and false to suppress the
> tuple ) when the operating is working in the reconciling window phase. As
> can be seen, this reconciling phase is only active at the max for one
> window.

- The operator uses the FSWindowManager to manage metadata at the end of
> every window. From resumption at a checkpoint, the operator will still scan
> the Kudu tablets but simply not emit all rows that were already streamed
> downstream. Subsequently when the operator is in the reconciling window,
> the method described above is invoked to allow for duplicates filter. After
> this reconciling window, the operator works in the normal mode of operation.
>

In which situation would I use this? How do I know the record was actually
processed downstream?


> - The following are the additional configurable aspects of the operator
>         - Max tuples per window
>         - Spin policy and the buffer size for the Disruptor Blocking Queue
>         - Mechanism to provide custom control tuples if required
>         - Setting the number of Physical operator instances via the API if
> required.
>         - Setting the fault Tolerance. If fault tolerant , an alternative
> replica of the Kudu tablet is picked up for scanning if the initial tablet
> fails for whatever reason. However this slows down the scan throughput.
> Hence it is configurable by the end user.
>
>
> Some notes regarding the SQL expression for the operator:
>
> - The operator uses ANTLR4 to parse the SQL expression.
> - The parser is based on a grammar file which is part of the source tree.
> The grammar is compiled on every build as part of the build process and
> code is generated for the parser automatically.
> - The reason we had to use a custom parser are (as opposed to something
> like calcite) :
>         - Kudu does not have all the features for a standard SQL
> expression. As an example != ( not equal to ) is not supported. Nor is
> there a concept of a Join etc.
>         - We are providing a lot more flexibility for the user to specify
> what the control tuple message should be should the end user choose to send
> a control tuple downstream after the given query is done processing
> - The SQL expression can specify a set of options for processing of the
> query:
>         - Control tuple message : A message/string that can be sent as the
> Control tuple field. There would be other parts for this control tuple like
> the query that was just completed and whether this is a begin or end of the
> scan.
>

Will there be any support for watermarks that would work with downstream
window operator? Another future idea might be to support Kudu as a source
in the Apex SQL API.


>         - Read Snapshot time : Kudu supports specifying the read snapshot
> time for which the scan has to occur. This is because Kudu is essentially
> an MVCC engine and stores multiple versions of the same row. The Read
> snapshot time allows for the end user to specify the read snapshot time for
> the scan.
> - The parser supports for general syntax checking. If there is an error in
> the SQL expression , the string representing the SQL expression supplied is
> emitted onto an error port and the next query is taken for processing.
> - The data types supported are only those data types as supported by the
> Kudu Engine. The parser supports data type parsing support. For example
> String data types are double quoted etc.
> - The Parser allows for a SELECT AA as BB style of expressions wherein AA
> is the column name in Kudu and BB is the name of the java POJO field name.
>
> Please let me know if the community has any other questions regarding the
> above design. I am planning to present this operator along with the Kudu
> output operator in the Data works summit next month and any feedback would
> be useful.
>
>
> Regards,
> Ananth

Reply via email to