[
https://issues.apache.org/jira/browse/APEXMALHAR-2472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16128171#comment-16128171
]
Ananth commented on APEXMALHAR-2472:
------------------------------------
The following are the main features that would be supported by the Input
operator:
- The input operator would be used to scan all or some rows of a single kudu
table.
- Each Kudu row is translated to a POJO for downstream operators.
- The Input operator would accept an SQL expression ( described in detail
below) that would be parsed to generate the equivalent scanner code for the
Kudu Table. This is because Kudu Table API does not support an SQL expressions
- The SQL expression would have additional options that would help in Apache
Apex design patterns ( Ex: Sending a control tuple message after a query is
successfully processed )
- The Input operator works on a continuous basis i.e. it would accept the next
query once the current query is complete)
- The operator will work in a distributed fashion for the input query. This
essentially means for a single input query, the scan work is distributed among
all of the physical instances of the input operator.
- Kudu splits a table into chunks of data regions called Tablets. The tablets
are replicated and partitioned (range and hash partitions are supported ) in
Kudu according to the Kudu Table definition. The operator allows partitioning
of the Input Operator to be done in 2 ways.
- Map many Kudu Tablets to one partition of the Apex Kudu Input operator
- One Kudu Tablet maps to one partition of the Apex Kudu Input operator
- The partitioning does not change on a per query basis. This is because of the
complex use cases that would arise. For example, if the query is touching only
a few rows before the next query is accepted, it would result in a lot of churn
in terms of operator serialize/deserialze, YARN allocation requests etc. Also
supporting per query partition planning leads to possibly very complex
implementation and poor resource usage as all physical instances of the
operator have to wait for its peers to complete its scan and wait for next
checkpoint to get repartitioned.
- The partitioner splits the work load of a single query in a round robin
fashion. After a query plan is generated , each scan token range is distributed
equally among the physical operator instances.
- The operator allows for two modes of scanning for an application ( Cannot be
changed on a per query basis )
- Consistent Order scanner - only one tablet scan thread is active at
any given instance of time for a given query
- Random Order scanner - Many threads are active to scan Kudu tablets
in parallel
- As can be seen, Consistent order scanner would be slower but would help in
better “exactly once” implementations if the correct method is overridden in
the operator.
- The operator introduces the DisruptorBlockingQueue for a low latency buffer
management. LMAX disruptor library was considered and based on some other
discussion threads on other Apache projects, settled on the ConversantMedia
implementation of the Disruptor Blocking queue. This blocking queue is used
when the kudu scanner thread wants to send the scanned row into the input
operators main thread emitTuples() call.
- The operator allows for exactly once semantics if the user specifies the
logic for reconciling a possible duplicate row in situations when the operator
is resuming from a checkpoint. This is done by overriding a method that returns
a boolean ( true to emit the tuple and false to suppress the tuple ) when the
operating is working in the reconciling window phase. As can be seen, this
reconciling phase is only active at the max for one window.
- The operator uses the FSWindowManager to manage metadata at the end of every
window. From resumption at a checkpoint, the operator will still scan the Kudu
tablets but simply not emit all rows that were already streamed downstream.
Subsequently when the operator is in the reconciling window, the method
described above is invoked to allow for duplicates filter. After this
reconciling window, the operator works in the normal mode of operation.
- The following are the additional configurable aspects of the operator
- Max tuples per window
- Spin policy and the buffer size for the Disruptor Blocking Queue
- Mechanism to provide custom control tuples if required
- Setting the number of Physical operator instances via the API if
required.
- Setting the fault Tolerance. If fault tolerant , an alternative
replica of the Kudu tablet is picked up for scanning if the initial tablet
fails for whatever reason. However this slows down the scan throughput. Hence
it is configurable by the end user.
Some notes regarding the SQL expression for the operator:
- The operator uses ANTLR4 to parse the SQL expression.
- The parser is based on a grammar file which is part of the source tree. The
grammar is compiled on every build as part of the build process and code is
generated for the parser automatically.
- The reason we had to use a custom parser are (as opposed to something like
calcite) :
- Kudu does not have all the features for a standard SQL expression. As
an example != ( not equal to ) is not supported. Nor is there a concept of a
Join etc.
- We are providing a lot more flexibility for the user to specify what
the control tuple message should be should the end user choose to send a
control tuple downstream after the given query is done processing
- The SQL expression can specify a set of options for processing of the query:
- Control tuple message : A message/string that can be sent as the
Control tuple field. There would be other parts for this control tuple like the
query that was just completed and whether this is a begin or end of the scan.
- Read Snapshot time : Kudu supports specifying the read snapshot time
for which the scan has to occur. This is because Kudu is essentially an MVCC
engine and stores multiple versions of the same row. The Read snapshot time
allows for the end user to specify the read snapshot time for the scan.
- The parser supports for general syntax checking. If there is an error in the
SQL expression , the string representing the SQL expression supplied is emitted
onto an error port and the next query is taken for processing.
- The data types supported are only those data types as supported by the Kudu
Engine. The parser supports data type parsing support. For example String data
types are double quoted etc.
- The Parser allows for a SELECT AA as BB style of expressions wherein AA is
the column name in Kudu and BB is the name of the java POJO field name.
> Implement Kudu Input Operator
> ------------------------------
>
> Key: APEXMALHAR-2472
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2472
> Project: Apache Apex Malhar
> Issue Type: New Feature
> Components: adapters database
> Reporter: Ananth
> Assignee: Ananth
>
> This operator would allow Kudu to be used as an Input store. This has
> multiple advantages like :
> - Ability to solve the dedup problem from entire data set perspective. The
> dedupe operators we have today are primarily window based and this might not
> meet all of the use cases in real world.
> - Ability to selectively stream data based on a SQL expression. Since Kudu is
> a structural store, we could effectively allow a SQL expression based "input"
> definition that would allow for selective streaming for all downstream
> operators. This could potentially be an alternative streaming store pattern
> as compared to Kafka as Kafka does not allow for selective streaming of
> tuples.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)