[ https://issues.apache.org/jira/browse/APEXMALHAR-2278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ananth closed APEXMALHAR-2278. ------------------------------ > Implement Kudu Output Operator for non-transactional streams > ------------------------------------------------------------ > > Key: APEXMALHAR-2278 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2278 > Project: Apache Apex Malhar > Issue Type: New Feature > Components: adapters database > Reporter: Ananth > Assignee: Ananth > Fix For: 3.8.0 > > > Here are some benefits of integrating Kudu and Apex: > Kudu is just declared 1.0 and has just been declared production ready. > Kudu as a store might a good a fit for many architectures in the years to > come because of its capabilities to provide mutability of data ( unlike HDFS > ) and optimized storage formats for low latency scans. > It seems to also withstand high-throughput write patterns which makes it > a stable sink for Apex workflows which operate at very high volumes. > [Design] > 1. The operator would be an AbstractOperator and would allow the concrete > implementations to set a few behavioral aspects of the operator. > 2. The following are the major phases of the operator: > During activate() phase of the operator : Establish a connection to the > cluster and get the metadata about the table that is being used as the sink. > During setup() phase of the operator: Fetch the current window > information and use it decide if we are recovering from a failure mode. (See > point 8 below ) > During process() of Input port : Inspect the incoming ExecutionContext ( > see below ) tuple and perform one of the operations ( > Insert/Update/Delete/Upsert) > 3. The following parameters are tunable while establishing a Kudu connection: > Table name, Boss worker threads, Worker threads, Socket read time outs > and External Consistency mode. > 4. The user need not specify any schema outright. The pojo fields are > automatically mapped to the table column names as identified in the schema > parse in the activate phase. > 5. Allow the concrete implementation of the operator to override the Pojo > field name to the table schema column name. This would allow flexibility in > use cases like table schema column names are not compatible with java bean > frameworks or in situations when column names cant be controlled as POJO is > coming from an upstream operator. > 6. The input tuple that is to be supplied to this operator is of type "Kudu > Execution Context". This tuple encompasses the actual Pojo that is going to > be persisted to the Kudu store. Additionally it allows the upstream operator > to specify the operation that needs to be performed. One of the following > operations is permitted as part of the context : Insert, Upsert, Update and > delete on the Pojo that is acting as the payload in the Execution Context. > 7. The concrete implementation of the operator would allow the user to > specify the actual POJO class definition that would be used to the write to > the table. The execution context would contain this POJO as well as the > metadata that defines the behavior of the processing that needs to be done on > that tuple. > 8. The operator would allow for a special case of execution mode for the > first window that is being processed as the operator gets activated. There > are two modes for the first window of processing of the operator : > a. Safe Mode : Safe mode is the "happy path execution" as in no extra > processing is required to perform the Kudu mutation. > b. Reconciling Mode: There is an additional function that would be > called to see if the user would like the tuple to be used for mutation. This > mode is automatically set when OperatorContext.ACTIVATION_WINDOW_ID != > Stateless.WINDOW_ID during the first window of processing by the operator. > This feature is deemed to be useful when an operator is recovering from a > crash instance of the application and we do not want to perform multiple > mutations of the same tuple given ATLEAST_ONCE is the default semantics. > 9. The operator is a stateless operator. > 10. The operator would generate the following autometrics : > a. Counts of Inserts, Upserts, Deletes and Updates (separate counters > for each mutation) for a given window > b. Bytes written in a given window > c. Write RPCs in the given window > d. Total RPC errors in this window > e. All of the above metrics for the operator for its entire lifetime > of the operator. -- This message was sent by Atlassian JIRA (v6.4.14#64029)