[ 
https://issues.apache.org/jira/browse/PHOENIX-7001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viraj Jasani updated PHOENIX-7001:
----------------------------------
    Fix Version/s: 5.3.0

> Change Data Capture leveraging Max Lookback and Uncovered Indexes
> -----------------------------------------------------------------
>
>                 Key: PHOENIX-7001
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-7001
>             Project: Phoenix
>          Issue Type: Improvement
>            Reporter: Kadir Ozdemir
>            Assignee: Hari Krishna Dara
>            Priority: Major
>             Fix For: 5.3.0
>
>
> The use cases for a Change Data Capture (CDC) feature are centered around 
> capturing changes to a given table (or updatable view) as these changes 
> happen in near real-time. A CDC application can retrieve changes in real-time 
> or with some delay, or even retrieves the same set of changes multiple times. 
> This means the CDC use case can be generalized as time range queries where 
> the time range is typically short such as last x minutes or hours or 
> expressed as a specific time range in the last n days where n is typically 
> less than 7.
> A change is an update in a row. That is, a change is either updating one or 
> more columns of a table for a given row or deleting a row. It is desirable to 
> provide these changes in the order of their arrival. One can visualize the 
> delivery of these changes through a stream from a Phoenix table to the 
> application that is initiated by the application similar to the delivery of 
> any other Phoenix query results. The difference is that a regular query 
> result includes at most one result row for each row satisfying the query and 
> the deleted rows are not visible to the query result while the CDC 
> stream/result can include multiple result rows for each row and the result 
> includes deleted rows. Some use cases need to also get the pre and/or post 
> image of the row along with a change on the row. 
> The design proposed here leverages Phoenix Max Lookback and Uncovered Global 
> Indexes. The max lookback feature retains recent changes to a table, that is, 
> the changes that have been done in the last x days typically. This means that 
> the max lookback feature already captures the changes to a given table. 
> Currently, the max lookback age is configurable at the cluster level. We need 
> to extend this capability to be able to configure the max lookback age at the 
> table level so that each table can have a different max lookback age based on 
> its CDC application requirements.
> To deliver the changes in the order of their arrival, we need a time based 
> index. This index should be uncovered as the changes are already retained in 
> the table by the max lookback feature. The arrival time will be defined as 
> the mutation timestamp generated by the server. An uncovered index would 
> allow us to efficiently and orderly access to the changes. Changes to an 
> index table are also preserved by the max lookback feature.
> A CDC feature can be composed of the following components:
>  * {*}CDCUncoveredIndexRegionScanner{*}: This is a server side scanner on an 
> uncovered index used for CDC. This can inherit UncoveredIndexRegionScanner. 
> It goes through index table rows using a raw scan to identify data table rows 
> and retrieves these rows using a raw scan. Using the time range, it forms a 
> JSON blob to represent changes to the row including pre and/or post row 
> images.
>  * {*}CDC Query Compiler{*}: This is a client side component. It prepares the 
> scan object based on the given CDC query statement. 
>  * {*}CDC DDL Compiler{*}: This is a client side component. It creates the 
> time based uncovered global index based on the given CDC DDL statement and a 
> virtual table of CDC type. CDC will be a new table type. 
> A CDC DDL syntax to create CDC on a (data) table can be as follows: 
> Create CDC <CDC Table Name> on <Data Table Name> INCLUDE (pre | post)  
> SALT_BUCKETS=<salt bucket count>
> The above CDC DDL creates a virtual CDC table and an uncovered index. The CDC 
> table PK columns start with the timestamp and continue with the data table PK 
> columns. The CDC table includes one non-PK column which is a JSON column. The 
> change is expressed in this JSON column in multiple ways based on the CDC DDL 
> or query statement. The change can be expressed as just the mutation for the 
> change, the pre image of the row (the image before the change), the post 
> image, or any combination of these. The CDC table is not a physical table on 
> disk. It is just a virtual table to be used in a CDC query. Phoenix stores 
> just the metadata for this virtual table. 
> A CDC query can be as follow:
> Select * from <CDC Table Name> where PHOENIX_ROW_TIMESTAMP() >= TO_DATE( …) 
> AND PHOENIX_ROW_TIMESTAMP() < TO_DATE( …)
> This query would return the rows of the CDC table which is constructed on the 
> server side by CDCUncoveredIndexRegionScanner by joining the uncovered index 
> row versions with the corresponding data table row version (using raw scans). 
> The above select query can be hinted at by using a new CDC hint to return 
> just the actual change, pre, or post image of the row, or a combination of 
> them to overwrite the default JSON column format defined by the CDC DDL 
> statement. 
> The CDC application will run the above query in a loop. When the difference 
> between the current time of the application and the upper limit of the time 
> range of the query becomes less than s milliseconds, say x milliseconds, then 
> the application thread needs to sleep s - x milliseconds. The value for s can 
> be small such as 1000 milliseconds. This is to make sure that time skew among 
> the server wall clocks does not lead to data loss.  
> A global time based index may create hot spotting during writes. This is 
> because the same region of the global index will keep getting updated. Since 
> the global index would be uncovered, the size of the updates will be usually 
> smaller than the data table updates. If we assume that index mutations are n 
> times smaller than data table mutations, then a single index region will be 
> able to sustain writes from n data table regions if the data table does not 
> have any other indexes. When the data table has other indexes, the data table 
> write can slow down by 3 times or so. This allows a single index region to 
> match with 3n data table regions. If the number of active data table regions 
> is more than a single index region can sustain then we need to distribute the 
> load to multiple index regions using salting. 
> A local time based index does not have the hot spotting issue but can result 
> in slower CDC queries for tables with a large number of regions. That is why 
> this proposal suggests using global indexes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to