Hello Thomas,

Replies in line marked [Ananth]>> 

Apologies for a little bit more longer description as I think the description 
needs more clarity. 

Regards,
Ananth

> On 19 Aug 2017, at 11:10 am, Thomas Weise <t...@apache.org> wrote:
> 
> Hi Ananth,
> 
> Nice writeup, couple questions/comments inline ->
> 
> On Tue, Aug 15, 2017 at 2:02 PM, Ananth G <ananthg.a...@gmail.com 
> <mailto:ananthg.a...@gmail.com>> wrote:
> 
>> Hello All,
>> 
>> The implementation for Apex Kudu Input Operator is ready for a pull
>> request. Before raising the pull request, I would like to get any inputs
>> regarding the design and incorporate any feedback before raising the pull
>> request in the next couple of days for the following JIRA.
>> 
>> https://issues.apache.org/jira/browse/APEXMALHAR-2472 
>> <https://issues.apache.org/jira/browse/APEXMALHAR-2472> <
>> https://issues.apache.org/jira/browse/APEXMALHAR-2472 
>> <https://issues.apache.org/jira/browse/APEXMALHAR-2472>>
>> 
>> The following are the main features that would be supported by the Input
>> operator:
>> 
>> - The input operator would be used to scan all or some rows of a single
>> kudu table.
>> - Each Kudu row is translated to a POJO for downstream operators.
>> - The Input operator would accept an SQL expression ( described in detail
>> below) that would be parsed to generate the equivalent scanner code for the
>> Kudu Table. This is because Kudu Table API does not support an SQL
>> expressions
>> - The SQL expression would have additional options that would help in
>> Apache Apex design patterns ( Ex: Sending a control tuple message after a
>> query is successfully processed )
>> - The Input operator works on a continuous basis i.e. it would accept the
>> next query once the current query is complete)
>> 
> 
> This means the operator will repeat the query to fetch newly added rows,
> similar to what the JDBC poll operator does, correct?
[Ananth]>> Yes.  All of this design is covered by the Abstract implementation. 
In fact there is a default implementation of the abstract operator that does 
exactly this.This default implementation operator is called 
IncrementalStepScanInputOperator. This operator based on a properties file can 
be used to implement the JDBC Poll operator functionality using a timestamp 
column as the incremental step value. 

The design however does not limit us to only this pattern but can accomodate 
other patterns as well. Here is what I want to add in this context: 
    - Additional pattern can include a “time travel pattern”. Since Kudu is an 
MVCC engine ( and if appropriately configured ) , we can use this operator to 
answer question like “ Can I stream the entire or subset of the kudu table at 
times 1 AM , 2 AM , 3 AM ..“ Of today even though the current time could be 6 
P.M. ( This is enabled by specifying the READ_SNAPSHOT_TIME which is a 
supported option of the SQL grammar we are enabling for this operator )
    - Another interesting pattern is when the next query has got no correlation 
with a previous query . Example use cases can be say using Apex-cli equivalent 
or more possible future use case like Apache Zeppelin integration. A query 
comes in ad-hoc and the values can be streamed from the current incoming 
expression i.e. when we want to enable interactive query based streaming.

> 
> - The operator will work in a distributed fashion for the input query. This
>> essentially means for a single input query, the scan work is distributed
>> among all of the physical instances of the input operator.
>> - Kudu splits a table into chunks of data regions called Tablets. The
>> tablets are replicated and partitioned  (range and hash partitions are
>> supported ) in Kudu according to the Kudu Table definition. The operator
>> allows partitioning of the Input Operator to be done in 2 ways.
>>        - Map many Kudu Tablets to one partition of the Apex Kudu Input
>> operator
>>        - One Kudu Tablet maps to one partition of the Apex Kudu Input
>> operator
>> - The partitioning does not change on a per query basis. This is because
>> of the complex use cases that would arise. For example, if the query is
>> touching only a few rows before the next query is accepted, it would result
>> in a lot of churn in terms of operator serialize/deserialze, YARN
>> allocation requests etc. Also supporting per query partition planning leads
>> to possibly very complex implementation and poor resource usage as all
>> physical instances of the operator have to wait for its peers to complete
>> its scan and wait for next checkpoint to get repartitioned.
>> 
> 
> Agreed, what would be a reason to change partitioning between queries
> though?
> 
[Ananth]>> Was making that note more in the context of Dynamic partitioning. My 
current understanding is that dynamic partitioning is entirely based on the 
performance stats and I was thinking that did not make exact sense when the 
stats need to be entirely dependent on the business logic ( query in this case 
) and not the operational aspects that is being supported by the StatsListener 
interface. So the Abstract operator suppresses any dynamic partitioning as the 
stats would severely change basing on the query and data distribution of the 
underlying kudu cluster and kudu table definition. 

> 
>> - The partitioner splits the work load of a single query in a round robin
>> fashion. After a query plan is generated , each scan token range is
>> distributed equally among the physical operator instances.
>> - The operator allows for two modes of scanning for an application (
>> Cannot be changed on a per query basis )
>>        - Consistent Order scanner - only one tablet scan thread is active
>> at any given instance of time for a given query
>>        - Random Order scanner - Many threads are active to scan Kudu
>> tablets in parallel
>> - As can be seen, Consistent order scanner would be slower but would help
>> in better “exactly once” implementations if the correct method is
>> overridden in the operator.
>> 
> 
> Can you elaborate on this a bit more? Ordering within a streaming window
> generally isn't deterministic when you have a shuffle or stream merge. And
> the association between input records and streaming windows can be made
> deterministic by using the window data manager?
> 
[Ananth]>>  The consistent order scanner essentially does two things : ensures 
that there is only single thread per apex operator for scanning a kudu tablet 
and also marking it fault tolerant from Kudu Tablet failures. I was more 
referring to the issue that this deterministic behaviour is not guaranteed at 
the Kudu tablets level itself when multiple tablets are mapped to a single apex 
partition. In this case, when there are multiple threads scanning multiple kudu 
tablets and contributing to the buffer of a single partition of the Apex 
operator. If however the user configures Apex operator as one to one mapping or 
uses the consistent order scanner, we can at least guarantee that the same 
ordering can happen just for the input tuples provided underlying kudu table is 
not mutated for that query result.

Yes you are right that we cannot claim deterministic ordering in case there is 
a shuffle or merge in the downstream operators. Also there is no shuffle or 
merge within the Kudu Input operator itself. The query planner ensures that the 
predicates are pushed down to the Kudu scan engine and the results are streamed 
to the individual partitions. If the user chooses to unify Input operator 
instances due to application design needs, then ordering is definitely lost. 

The window manager comment is entirely for the Input operator state management 
and I did not mean that we are providing for end to end functionality with this 
input operator. This is the reason why we have a method called is 
“isAllowedInReconcilingWindow” as  an overridable method as only a business 
logic can tell in case of resumption from a checkpoint how to decide if we need 
to reprocess a tuple. Comparing it to the Kafka implementation, Kafka commits 
the offsets at the committed call whereas I could not take that approach 
because of the following reasons: 

- All of the physical instances of the Kudu input operator might not be 
processing the same query at any given instance of time ! This is because as 
soon as the physical instance of the operator is done processing with the 
current query, it is asking for its next work order. Since the data 
distribution of the underlying kudu cluster decides the amount of time a 
physical instance of the Apex Kudu input operator spends on working on a work 
order, each operator is at varying processing query windows.
- As long as the the concrete implementation of the abstract input operator 
provides for the same sequence of the input queries/work orders, all of the 
physical instances should eventually process all of the data in some future 
time.
- The Kafka operator had the benefit of doing this at committed call because it 
is based on offset and the work order/kafka partition does not change 
- In the case of Kudu input operator, it is entirely possible that there are 
multiple different queries processed and completed between calls for checkpoint 
and hence I thought it was sensible to only guarantee a state at the 
checkpoint() call rather than committed call. 
- The Abstract input operator automatically filters all the data till the last 
window but one before the shutdown crash. These windows are being termed as 
“safe mode” in the code.
- In the window that was active when shutdown/crash happened that needs more  
stronger check ( called the reconciling window), the operator chooses to give 
flexibility for the user to decide whether we need to “re-stream” a tuple by 
the method “isAllowedInReconcilingWindow”. 


> 
>> - The operator introduces the DisruptorBlockingQueue for a low latency
>> buffer management. LMAX disruptor library was considered and based on some
>> other discussion threads on other Apache projects, settled on the
>> ConversantMedia implementation of the Disruptor Blocking queue. This
>> blocking queue is used when the kudu scanner thread wants to send the
>> scanned row into the input operators main thread emitTuples() call.
>> - The operator allows for exactly once semantics if the user specifies the
>> logic for reconciling a possible duplicate row in situations when the
>> operator is resuming from a checkpoint. This is done by overriding a method
>> that returns a boolean ( true to emit the tuple and false to suppress the
>> tuple ) when the operating is working in the reconciling window phase. As
>> can be seen, this reconciling phase is only active at the max for one
>> window.
> 
> - The operator uses the FSWindowManager to manage metadata at the end of
>> every window. From resumption at a checkpoint, the operator will still scan
>> the Kudu tablets but simply not emit all rows that were already streamed
>> downstream. Subsequently when the operator is in the reconciling window,
>> the method described above is invoked to allow for duplicates filter. After
>> this reconciling window, the operator works in the normal mode of operation.
>> 
> 
> In which situation would I use this? How do I know the record was actually
> processed downstream?

[Ananth]>> I guess this is covered in the response which I just added above. 
Since there is no “good way” to decide in a truely distributed application, the 
flexibility is given to the end user to decide. The input operator only 
provides a guarantee that we will not duplicate rows in a best possible effort. 
On the other hand , there is the other use case if committed window is called. 
We might end up sending in more duplicates downstream in case of a crash and 
possibly miss some queries as well as each operator is at different queries at 
the time of the commit call. 

> 
> 
>> - The following are the additional configurable aspects of the operator
>>        - Max tuples per window
>>        - Spin policy and the buffer size for the Disruptor Blocking Queue
>>        - Mechanism to provide custom control tuples if required
>>        - Setting the number of Physical operator instances via the API if
>> required.
>>        - Setting the fault Tolerance. If fault tolerant , an alternative
>> replica of the Kudu tablet is picked up for scanning if the initial tablet
>> fails for whatever reason. However this slows down the scan throughput.
>> Hence it is configurable by the end user.
>> 
>> 
>> Some notes regarding the SQL expression for the operator:
>> 
>> - The operator uses ANTLR4 to parse the SQL expression.
>> - The parser is based on a grammar file which is part of the source tree.
>> The grammar is compiled on every build as part of the build process and
>> code is generated for the parser automatically.
>> - The reason we had to use a custom parser are (as opposed to something
>> like calcite) :
>>        - Kudu does not have all the features for a standard SQL
>> expression. As an example != ( not equal to ) is not supported. Nor is
>> there a concept of a Join etc.
>>        - We are providing a lot more flexibility for the user to specify
>> what the control tuple message should be should the end user choose to send
>> a control tuple downstream after the given query is done processing
>> - The SQL expression can specify a set of options for processing of the
>> query:
>>        - Control tuple message : A message/string that can be sent as the
>> Control tuple field. There would be other parts for this control tuple like
>> the query that was just completed and whether this is a begin or end of the
>> scan.
>> 
> 
> Will there be any support for watermarks that would work with downstream
> window operator? Another future idea might be to support Kudu as a source
> in the Apex SQL API.
> 
[Ananth]>> Currently there is a control tuple that is emitted at the end of the 
query if enabled in the SQL expression. Apart from this, the operator also 
allows for sending a control tuple at the beginning and ending of the query for 
a given physical instance of the operator should the user choose to. The 
control tuple itself is entirely extensible and is hence given as a templated 
variable to the Abstract operator. This essentially means that watermarks are 
supported by default if the user wants to send a custom marker? However it may 
be noted that this control tuple is can only be emitted at the beginning and 
end of the query. If you are referring to watermark for the data like event 
time based watermarks, I will need to explore a bit more into this and will 
perhaps need some enhancement

Support for APEX SQL API is a great idea. I will need to do some more homework 
here as I have to see how the custom parser and the Calcite integration needs 
to happen as I believe we are using the Calcite parsers to enable some of the 
Streaming API ? Kudu drivers do not yet mention of any JDBC based driver and 
need to see how well we can use Calcite in the right way. 

> 
>>        - Read Snapshot time : Kudu supports specifying the read snapshot
>> time for which the scan has to occur. This is because Kudu is essentially
>> an MVCC engine and stores multiple versions of the same row. The Read
>> snapshot time allows for the end user to specify the read snapshot time for
>> the scan.
>> - The parser supports for general syntax checking. If there is an error in
>> the SQL expression , the string representing the SQL expression supplied is
>> emitted onto an error port and the next query is taken for processing.
>> - The data types supported are only those data types as supported by the
>> Kudu Engine. The parser supports data type parsing support. For example
>> String data types are double quoted etc.
>> - The Parser allows for a SELECT AA as BB style of expressions wherein AA
>> is the column name in Kudu and BB is the name of the java POJO field name.
>> 
>> Please let me know if the community has any other questions regarding the
>> above design. I am planning to present this operator along with the Kudu
>> output operator in the Data works summit next month and any feedback would
>> be useful.
>> 
>> 
>> Regards,
>> Ananth

Reply via email to