[ 
https://issues.apache.org/jira/browse/PHOENIX-7001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kadir Ozdemir updated PHOENIX-7001:
-----------------------------------
    Description: 
The use cases for a Change Data Capture (CDC) feature are centered around 
capturing changes to a given table (or updatable view) as these changes happen 
in near real-time. A CDC application can retrieve changes in real-time or with 
some delay, or even retrieves the same set of changes multiple times. This 
means the CDC use case can be generalized as time range queries where the time 
range is typically short such as last x minutes or hours or expressed as a 
specific time range in the last n days where n is typically less than 7.

A change is an update in a row. That is, a change is either updating one or 
more columns of a table for a given row or deleting a row. It is desirable to 
provide these changes in the order of their arrival. One can visualize the 
delivery of these changes through a stream from a Phoenix table to the 
application that is initiated by the application similar to the delivery of any 
other Phoenix query results. The difference is that a regular query result 
includes at most one result row for each row satisfying the query and the 
deleted rows are not visible to the query result while the CDC stream/result 
can include multiple result rows for each row and the result includes deleted 
rows. Some use cases need to also get the pre and/or post image of the row 
along with a change on the row. 

The design proposed here leverages Phoenix Max Lookback and Uncovered Global 
Indexes. The max lookback feature retains recent changes to a table, that is, 
the changes that have been done in the last x days typically. This means that 
the max lookback feature already captures the changes to a given table. 
Currently, the max lookback age is configurable at the cluster level. We need 
to extend this capability to be able to configure the max lookback age at the 
table level so that each table can have a different max lookback age based on 
its CDC application requirements.

To deliver the changes in the order of their arrival, we need a time based 
index. This index should be uncovered as the changes are already retained in 
the table by the max lookback feature. The arrival time will be defined as the 
mutation timestamp generated by the server. An uncovered index would allow us 
to efficiently and orderly access to the changes. Changes to an index table are 
also preserved by the max lookback feature.

A CDC feature can be composed of the following components:
 * {*}CDCUncoveredIndexRegionScanner{*}: This is a server side scanner on an 
uncovered index used for CDC. This can inherit UncoveredIndexRegionScanner. It 
goes through index table rows using a raw scan to identify data table rows and 
retrieves these rows using a raw scan. Using the time range, it forms a JSON 
blob to represent changes to the row including pre and/or post row images.
 * {*}CDC Query Compiler{*}: This is a client side component. It prepares the 
scan object based on the given CDC query statement. 
 * {*}CDC DDL Compiler{*}: This is a client side component. It creates the time 
based uncovered global index based on the given CDC DDL statement and a virtual 
table of CDC type. CDC will be a new table type. 

A CDC DDL syntax to create CDC on a (data) table can be as follows: 

Create CDC <CDC Table Name> on <Data Table Name> INCLUDE (pre | post)  
SALT_BUCKETS=<salt bucket count>

The above CDC DDL creates a virtual CDC table and an uncovered index. The CDC 
table PK columns start with the timestamp and continue with the data table PK 
columns. The CDC table includes one non-PK column which is a JSON column. The 
change is expressed in this JSON column in multiple ways based on the CDC DDL 
or query statement. The change can be expressed as just the mutation for the 
change, the pre image of the row (the image before the change), the post image, 
or any combination of these. The CDC table is not a physical table on disk. It 
is just a virtual table to be used in a CDC query. Phoenix stores just the 
metadata for this virtual table. 

A CDC query can be as follow:

Select * from <CDC Table Name> where PHOENIX_ROW_TIMESTAMP() >= TO_DATE( …) AND 
PHOENIX_ROW_TIMESTAMP() < TO_DATE( …)

This query would return the rows of the CDC table which is constructed on the 
server side by CDCUncoveredIndexRegionScanner by joining the uncovered index 
row versions with the corresponding data table row version (using raw scans). 
The above select query can be hinted at by using a new CDC hint to return just 
the actual change, pre, or post image of the row, or a combination of them to 
overwrite the default JSON column format defined by the CDC DDL statement. 

The CDC application will run the above query in a loop. When the difference 
between the current time of the application and the upper limit of the time 
range of the query becomes less than s milliseconds, say x milliseconds, then 
the application thread needs to sleep s - x milliseconds. The value for s can 
be small such as 1000 milliseconds. This is to make sure that time skew among 
the server wall clocks does not lead to data loss.  

A global time based index may create hot spotting during writes. This is 
because the same region of the global index will keep getting updated. Since 
the global index would be uncovered, the size of the updates will be usually 
smaller than the data table updates. If we assume that index mutations are n 
times smaller than data table mutations, then a single index region will be 
able to sustain writes from n data table regions if the data table does not 
have any other indexes. When the data table has other indexes, the data table 
write can slow down by 3 times or so. This allows a single index region to 
match with 3n data table regions. If the number of active data table regions is 
more than a single index region can sustain then we need to distribute the load 
to multiple index regions using salting. 

A local time based index does not have the hot spotting issue but can result in 
slower CDC queries for tables with a large number of regions. That is why this 
proposal suggests using global indexes.

  was:
The use cases for a Change Data Capture (CDC) feature are centered around 
capturing changes to a given table (or updatable view) as these changes happen 
in near real-time. A CDC application can retrieve changes in real-time or with 
some delay, or even retrieves the same set of changes multiple times. This 
means the CDC use case can be generalized as time range queries where the time 
range is typically short such as last x minutes or hours or expressed as a 
specific time range in the last n days where n is typically less than 7.

A change is an update in a row. That is, a change is either updating one or 
more columns of a table for a given row or deleting a row. It is desirable to 
provide these changes in the order of their arrival. One can visualize the 
delivery of these changes through a stream from a Phoenix table to the 
application that is initiated by the application similar to the delivery of any 
other Phoenix query results. The difference is that a regular query result 
includes at most one result row for each row satisfying the query and the 
deleted rows are not visible to the query result while the CDC stream/result 
can include multiple result rows for each row and the result includes deleted 
rows. Some use cases need to also get the pre and/or post image of the row 
along with a change on the row. 

The design proposed here leverages Phoenix Max Lookback and Uncovered (Global 
or Local) Indexes. The max lookback feature retains recent changes to a table, 
that is, the changes that have been done in the last x days typically. This 
means that the max lookback feature already captures the changes to a given 
table. Currently, the max lookback age is configurable at the cluster level. We 
need to extend this capability to be able to configure the max lookback age at 
the table level so that each table can have a different max lookback age based 
on its CDC application requirements.

To deliver the changes in the order of their arrival, we need a time based 
index. This index should be uncovered as the changes are already retained in 
the table by the max lookback feature. The arrival time will be defined as the 
mutation timestamp generated by the server. An uncovered index would allow us 
to efficiently and orderly access to the changes. Changes to an index table are 
also preserved by the max lookback feature.

A CDC feature can be composed of the following components:
 * {*}CDCUncoveredIndexRegionScanner{*}: This is a server side scanner on an 
uncovered index used for CDC. This can inherit UncoveredIndexRegionScanner. It 
goes through index table rows using a raw scan to identify data table rows and 
retrieves these rows using a raw scan. Using the time range, it forms a JSON 
blob to represent changes to the row including pre and/or post row images.
 * {*}CDC Query Compiler{*}: This is a client side component. It prepares the 
scan object based on the given CDC query statement. 
 * {*}CDC DDL Compiler{*}: This is a client side component. It creates the time 
based uncovered (global/local) index based on the given CDC DDL statement and a 
virtual table of CDC type. CDC will be a new table type. 

A CDC DDL syntax to create CDC on a (data) table can be as follows: 

Create CDC <CDC Table Name> on <Data Table Name> INCLUDE (pre | post | latest | 
all) INDEX = <GLOBAL | LOCAL> SALT_BUCKETS=<salt bucket count>

The above CDC DDL creates a virtual CDC table and an uncovered index. The CDC 
table PK columns start with the timestamp and continue with the data table PK 
columns. The CDC table includes one non-PK column which is a JSON column. The 
change is expressed in this JSON column in multiple ways based on the CDC DDL 
or query statement. The change can be expressed as just the mutation for the 
change, the latest image of the row, the pre image of the row (the image before 
the change), the post image, or any combination of these. The CDC table is not 
a physical table on disk. It is just a virtual table to be used in a CDC query. 
Phoenix stores just the metadata for this virtual table. 

A CDC query can be as follow:

Select * from <CDC Table Name> where PHOENIX_ROW_TIMESTAMP() >= TO_DATE( …) AND 
PHOENIX_ROW_TIMESTAMP() < TO_DATE( …)

This query would return the rows of the CDC table which is constructed on the 
server side by CDCUncoveredIndexRegionScanner by joining the uncovered index 
row versions with the corresponding data table row version (using raw scans). 
The above select query can be hinted at by using a new CDC hint to return just 
the actual change, pre, pos, or latest image of the row, or a combination of 
them to overwrite the default JSON column format defined by the CDC DDL 
statement. 

The CDC application will run the above query in a loop. When the difference 
between the current time of the application and the upper limit of the time 
range of the query becomes less than s milliseconds, say x milliseconds, then 
the application thread needs to sleep s - x milliseconds. The value for s can 
be small such as 1000 milliseconds. This is to make sure that time skew among 
the server wall clocks does not lead to data loss.  

A global time based index may create hot spotting during writes. This is 
because the same region of the global index will keep getting updated. Since 
the global index would be uncovered, the size of the updates will be usually 
smaller than the data table updates. If we assume that index mutations are n 
times smaller than data table mutations, then a single index region will be 
able to sustain writes from n data table regions if the data table does not 
have any other indexes. When the data table has other indexes, the data table 
write can slow down by 3 times or so. This allows a single index region to 
match with 3n data table regions. If the number of active data table regions is 
more than a single index region can sustain then we need to distribute the load 
to multiple index regions using salting. 

A local time based index does not have the hot spotting issue but can result in 
slower CDC queries for tables with a large number of regions. That is why this 
proposal suggests using global indexes by default.


> Change Data Capture leveraging Max Lookback and Uncovered Indexes
> -----------------------------------------------------------------
>
>                 Key: PHOENIX-7001
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-7001
>             Project: Phoenix
>          Issue Type: Improvement
>            Reporter: Kadir Ozdemir
>            Priority: Major
>
> The use cases for a Change Data Capture (CDC) feature are centered around 
> capturing changes to a given table (or updatable view) as these changes 
> happen in near real-time. A CDC application can retrieve changes in real-time 
> or with some delay, or even retrieves the same set of changes multiple times. 
> This means the CDC use case can be generalized as time range queries where 
> the time range is typically short such as last x minutes or hours or 
> expressed as a specific time range in the last n days where n is typically 
> less than 7.
> A change is an update in a row. That is, a change is either updating one or 
> more columns of a table for a given row or deleting a row. It is desirable to 
> provide these changes in the order of their arrival. One can visualize the 
> delivery of these changes through a stream from a Phoenix table to the 
> application that is initiated by the application similar to the delivery of 
> any other Phoenix query results. The difference is that a regular query 
> result includes at most one result row for each row satisfying the query and 
> the deleted rows are not visible to the query result while the CDC 
> stream/result can include multiple result rows for each row and the result 
> includes deleted rows. Some use cases need to also get the pre and/or post 
> image of the row along with a change on the row. 
> The design proposed here leverages Phoenix Max Lookback and Uncovered Global 
> Indexes. The max lookback feature retains recent changes to a table, that is, 
> the changes that have been done in the last x days typically. This means that 
> the max lookback feature already captures the changes to a given table. 
> Currently, the max lookback age is configurable at the cluster level. We need 
> to extend this capability to be able to configure the max lookback age at the 
> table level so that each table can have a different max lookback age based on 
> its CDC application requirements.
> To deliver the changes in the order of their arrival, we need a time based 
> index. This index should be uncovered as the changes are already retained in 
> the table by the max lookback feature. The arrival time will be defined as 
> the mutation timestamp generated by the server. An uncovered index would 
> allow us to efficiently and orderly access to the changes. Changes to an 
> index table are also preserved by the max lookback feature.
> A CDC feature can be composed of the following components:
>  * {*}CDCUncoveredIndexRegionScanner{*}: This is a server side scanner on an 
> uncovered index used for CDC. This can inherit UncoveredIndexRegionScanner. 
> It goes through index table rows using a raw scan to identify data table rows 
> and retrieves these rows using a raw scan. Using the time range, it forms a 
> JSON blob to represent changes to the row including pre and/or post row 
> images.
>  * {*}CDC Query Compiler{*}: This is a client side component. It prepares the 
> scan object based on the given CDC query statement. 
>  * {*}CDC DDL Compiler{*}: This is a client side component. It creates the 
> time based uncovered global index based on the given CDC DDL statement and a 
> virtual table of CDC type. CDC will be a new table type. 
> A CDC DDL syntax to create CDC on a (data) table can be as follows: 
> Create CDC <CDC Table Name> on <Data Table Name> INCLUDE (pre | post)  
> SALT_BUCKETS=<salt bucket count>
> The above CDC DDL creates a virtual CDC table and an uncovered index. The CDC 
> table PK columns start with the timestamp and continue with the data table PK 
> columns. The CDC table includes one non-PK column which is a JSON column. The 
> change is expressed in this JSON column in multiple ways based on the CDC DDL 
> or query statement. The change can be expressed as just the mutation for the 
> change, the pre image of the row (the image before the change), the post 
> image, or any combination of these. The CDC table is not a physical table on 
> disk. It is just a virtual table to be used in a CDC query. Phoenix stores 
> just the metadata for this virtual table. 
> A CDC query can be as follow:
> Select * from <CDC Table Name> where PHOENIX_ROW_TIMESTAMP() >= TO_DATE( …) 
> AND PHOENIX_ROW_TIMESTAMP() < TO_DATE( …)
> This query would return the rows of the CDC table which is constructed on the 
> server side by CDCUncoveredIndexRegionScanner by joining the uncovered index 
> row versions with the corresponding data table row version (using raw scans). 
> The above select query can be hinted at by using a new CDC hint to return 
> just the actual change, pre, or post image of the row, or a combination of 
> them to overwrite the default JSON column format defined by the CDC DDL 
> statement. 
> The CDC application will run the above query in a loop. When the difference 
> between the current time of the application and the upper limit of the time 
> range of the query becomes less than s milliseconds, say x milliseconds, then 
> the application thread needs to sleep s - x milliseconds. The value for s can 
> be small such as 1000 milliseconds. This is to make sure that time skew among 
> the server wall clocks does not lead to data loss.  
> A global time based index may create hot spotting during writes. This is 
> because the same region of the global index will keep getting updated. Since 
> the global index would be uncovered, the size of the updates will be usually 
> smaller than the data table updates. If we assume that index mutations are n 
> times smaller than data table mutations, then a single index region will be 
> able to sustain writes from n data table regions if the data table does not 
> have any other indexes. When the data table has other indexes, the data table 
> write can slow down by 3 times or so. This allows a single index region to 
> match with 3n data table regions. If the number of active data table regions 
> is more than a single index region can sustain then we need to distribute the 
> load to multiple index regions using salting. 
> A local time based index does not have the hot spotting issue but can result 
> in slower CDC queries for tables with a large number of regions. That is why 
> this proposal suggests using global indexes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to