Kadir Ozdemir created PHOENIX-7001:
--------------------------------------

             Summary: Change Data Capture leveraging Max Lookback and Uncovered 
Indexes
                 Key: PHOENIX-7001
                 URL: https://issues.apache.org/jira/browse/PHOENIX-7001
             Project: Phoenix
          Issue Type: Improvement
            Reporter: Kadir Ozdemir


The use cases for a Change Data Capture (CDC) feature are centered around 
capturing changes to a given table (or updatable view) as these changes happen 
in near real-time. A CDC application can retrieve changes in real-time or with 
some delay, or even retrieves the same set of changes multiple times. This 
means the CDC use case can be generalized as time range queries where the time 
range is typically short such as last x minutes or hours or expressed as a 
specific time range in the last n days where n is typically less than 7.

A change is an update in a row. That is, a change is either updating one or 
more columns of a table for a given row or deleting a row. It is desirable to 
provide these changes in the order of their arrival. One can visualize the 
delivery of these changes through a stream from a Phoenix table to the 
application that is initiated by the application similar to the delivery of any 
other Phoenix query results. The difference is that a regular query result 
includes at most one result row for each row satisfying the query and the 
deleted rows are not visible to the query result while the CDC stream/result 
can include multiple result rows for each row and the result includes deleted 
rows. Some use cases need to also get the pre and/or post image of the row 
along with a change on the row. 

The design proposed here leverages Phoenix Max Lookback and Uncovered (Global 
or Local) Indexes. The max lookback feature retains recent changes to a table, 
that is, the changes that have been done in the last x days typically. This 
means that the max lookback feature already captures the changes to a given 
table. Currently, the max lookback age is configurable at the cluster level. We 
need to extend this capability to be able to configure the max lookback age at 
the table level so that each table can have a different max lookback age based 
on its CDC application requirements.

To deliver the changes in the order of their arrival, we need a time based 
index. This index should be uncovered as the changes are already retained in 
the table by the max lookback feature. The arrival time can be defined as the 
mutation timestamp generated by the server, or a user-specified timestamp (or 
any other long integer) column. An uncovered index would allow us to 
efficiently and orderly access to the changes. Changes to an index table are 
also preserved by the max lookback feature.

A CDC feature can be composed of the following components:
 * {*}CDCUncoveredIndexRegionScanner{*}: This is a server side scanner on an 
uncovered index used for CDC. This can inherit UncoveredIndexRegionScanner. It 
goes through index table rows using a raw scan to identify data table rows and 
retrieves these rows using a raw scan. Using the time range, it forms a JSON 
blob to represent changes to the row including pre and/or post row images.
 * {*}CDC Query Compiler{*}: This is a client side component. It prepares the 
scan object based on the given CDC query statement. 
 * {*}CDC DDL Compiler{*}: This is a client side component. It creates the time 
based uncovered (global/local) index based on the given CDC DDL statement and a 
virtual table of CDC type. CDC will be a new table type. 

A CDC DDL syntax to create CDC on a (data) table can be as follows: 

Create CDC <CDC Table Name> on <Data Table Name> (PHOENIX_ROW_TIMESTAMP()  | 
<Data Table Column>) INCLUDE (pre | post | latest | all) TTL = <Age in seconds> 
INDEX = <GLOBAL | LOCAL> SALT_BUCKETS=<salt bucket count>

The above CDC DDL creates a virtual CDC table and an uncovered index. The CDC 
table PK columns start with the timestamp or user defined column and continue 
with the data table PK columns. The CDC table includes one non-PK column which 
is a JSON column. The change is expressed in this JSON column in multiple ways 
based on the CDC DDL or query statement. The change can be expressed as just 
the mutation for the change, the latest image of the row, the pre image of the 
row (the image before the change), the post image, or any combination of these. 
The CDC table is not a physical table on disk. It is just a virtual table to be 
used in a CDC query. Phoenix stores just the metadata for this virtual table. 

A CDC query can be as follow:

Select * from <CDC Table Name> where PHOENIX_ROW_TIMESTAMP() >= TO_DATE( …) AND 
PHOENIX_ROW_TIMESTAMP() < TO_DATE( …)

This query would return the rows of the CDC table which is constructed on the 
server side by CDCUncoveredIndexRegionScanner by joining the uncovered index 
row versions with the corresponding data table row version (using raw scans). 
The above select query can be hinted at by using a new CDC hint to return just 
the actual change, pre, pos, or latest image of the row, or a combination of 
them to overwrite the default JSON column format defined by the CDC DDL 
statement. 

The CDC application will run the above query in a loop. When the difference 
between the current time of the application and the upper limit of the time 
range of the query becomes less than s milliseconds, say x milliseconds, then 
the application thread needs to sleep s - x milliseconds. The value for s can 
be small such as 1000 milliseconds. This is to make sure that time skew among 
the server wall clocks does not lead to data loss.  

A global time based index may create hot spotting during writes. This is 
because the same region of the global index will keep getting updated. Since 
the global index would be uncovered, the size of the updates will be usually 
smaller than the data table updates. If we assume that index mutations are n 
times smaller than data table mutations, then a single index region will be 
able to sustain writes from n data table regions if the data table does not 
have any other indexes. When the data table has other indexes, the data table 
write can slow down by 3 times or so. This allows a single index region to 
match with 3n data table regions. If the number of active data table regions is 
more than a single index region can sustain then we need to distribute the load 
to multiple index regions using salting. 

A local time based index does not have the hot spotting issue but can result in 
slower CDC queries for tables with a large number of regions. That is why this 
proposal suggests using global indexes by default.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to