[jira] [Updated] (PHOENIX-7001) Change Data Capture leveraging Max Lookback and Uncovered Indexes

2024-04-18 Thread Kadir Ozdemir (Jira)


 [ 
https://issues.apache.org/jira/browse/PHOENIX-7001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kadir Ozdemir updated PHOENIX-7001:
---
Description: 
The use cases for a Change Data Capture (CDC) feature are centered around 
capturing changes to a given table (or updatable view) as these changes happen 
in near real-time. A CDC application can retrieve changes in real-time or with 
some delay, or even retrieves the same set of changes multiple times. This 
means the CDC use case can be generalized as time range queries where the time 
range is typically short such as last x minutes or hours or expressed as a 
specific time range in the last n days where n is typically less than 7.

A change is an update in a row. That is, a change is either updating one or 
more columns of a table for a given row or deleting a row. It is desirable to 
provide these changes in the order of their arrival. One can visualize the 
delivery of these changes through a stream from a Phoenix table to the 
application that is initiated by the application similar to the delivery of any 
other Phoenix query results. The difference is that a regular query result 
includes at most one result row for each row satisfying the query and the 
deleted rows are not visible to the query result while the CDC stream/result 
can include multiple result rows for each row and the result includes deleted 
rows. Some use cases need to also get the pre and/or post image of the row 
along with a change on the row. 

The design proposed here leverages Phoenix Max Lookback and Uncovered Global 
Indexes. The max lookback feature retains recent changes to a table, that is, 
the changes that have been done in the last x days typically. This means that 
the max lookback feature already captures the changes to a given table. 
Currently, the max lookback age is configurable at the cluster level. We need 
to extend this capability to be able to configure the max lookback age at the 
table level so that each table can have a different max lookback age based on 
its CDC application requirements.

To deliver the changes in the order of their arrival, we need a time based 
index. This index should be uncovered as the changes are already retained in 
the table by the max lookback feature. The arrival time will be defined as the 
mutation timestamp generated by the server. An uncovered index would allow us 
to efficiently and orderly access to the changes. Changes to an index table are 
also preserved by the max lookback feature.

A CDC feature can be composed of the following components:
 * {*}CDCUncoveredIndexRegionScanner{*}: This is a server side scanner on an 
uncovered index used for CDC. This can inherit UncoveredIndexRegionScanner. It 
goes through index table rows using a raw scan to identify data table rows and 
retrieves these rows using a raw scan. Using the time range, it forms a JSON 
blob to represent changes to the row including pre and/or post row images.
 * {*}CDC Query Compiler{*}: This is a client side component. It prepares the 
scan object based on the given CDC query statement. 
 * {*}CDC DDL Compiler{*}: This is a client side component. It creates the time 
based uncovered global index based on the given CDC DDL statement and a virtual 
table of CDC type. CDC will be a new table type. 

A CDC DDL syntax to create CDC on a (data) table can be as follows: 

Create CDC  on  INCLUDE (pre | post)  
SALT_BUCKETS=

The above CDC DDL creates a virtual CDC table and an uncovered index. The CDC 
table PK columns start with the timestamp and continue with the data table PK 
columns. The CDC table includes one non-PK column which is a JSON column. The 
change is expressed in this JSON column in multiple ways based on the CDC DDL 
or query statement. The change can be expressed as just the mutation for the 
change, the pre image of the row (the image before the change), the post image, 
or any combination of these. The CDC table is not a physical table on disk. It 
is just a virtual table to be used in a CDC query. Phoenix stores just the 
metadata for this virtual table. 

A CDC query can be as follow:

Select * from  where PHOENIX_ROW_TIMESTAMP() >= TO_DATE( …) AND 
PHOENIX_ROW_TIMESTAMP() < TO_DATE( …)

This query would return the rows of the CDC table which is constructed on the 
server side by CDCUncoveredIndexRegionScanner by joining the uncovered index 
row versions with the corresponding data table row version (using raw scans). 
The above select query can be hinted at by using a new CDC hint to return just 
the actual change, pre, or post image of the row, or a combination of them to 
overwrite the default JSON column format defined by the CDC DDL statement. 

The CDC application will run the above query in a loop. When the difference 
between the current time of the application and the upper limit of the time 
range of the query becomes less than s milliseconds, say x 

[jira] [Updated] (PHOENIX-7001) Change Data Capture leveraging Max Lookback and Uncovered Indexes

2024-01-23 Thread Kadir Ozdemir (Jira)


 [ 
https://issues.apache.org/jira/browse/PHOENIX-7001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kadir Ozdemir updated PHOENIX-7001:
---
Description: 
The use cases for a Change Data Capture (CDC) feature are centered around 
capturing changes to a given table (or updatable view) as these changes happen 
in near real-time. A CDC application can retrieve changes in real-time or with 
some delay, or even retrieves the same set of changes multiple times. This 
means the CDC use case can be generalized as time range queries where the time 
range is typically short such as last x minutes or hours or expressed as a 
specific time range in the last n days where n is typically less than 7.

A change is an update in a row. That is, a change is either updating one or 
more columns of a table for a given row or deleting a row. It is desirable to 
provide these changes in the order of their arrival. One can visualize the 
delivery of these changes through a stream from a Phoenix table to the 
application that is initiated by the application similar to the delivery of any 
other Phoenix query results. The difference is that a regular query result 
includes at most one result row for each row satisfying the query and the 
deleted rows are not visible to the query result while the CDC stream/result 
can include multiple result rows for each row and the result includes deleted 
rows. Some use cases need to also get the pre and/or post image of the row 
along with a change on the row. 

The design proposed here leverages Phoenix Max Lookback and Uncovered (Global 
or Local) Indexes. The max lookback feature retains recent changes to a table, 
that is, the changes that have been done in the last x days typically. This 
means that the max lookback feature already captures the changes to a given 
table. Currently, the max lookback age is configurable at the cluster level. We 
need to extend this capability to be able to configure the max lookback age at 
the table level so that each table can have a different max lookback age based 
on its CDC application requirements.

To deliver the changes in the order of their arrival, we need a time based 
index. This index should be uncovered as the changes are already retained in 
the table by the max lookback feature. The arrival time will be defined as the 
mutation timestamp generated by the server. An uncovered index would allow us 
to efficiently and orderly access to the changes. Changes to an index table are 
also preserved by the max lookback feature.

A CDC feature can be composed of the following components:
 * {*}CDCUncoveredIndexRegionScanner{*}: This is a server side scanner on an 
uncovered index used for CDC. This can inherit UncoveredIndexRegionScanner. It 
goes through index table rows using a raw scan to identify data table rows and 
retrieves these rows using a raw scan. Using the time range, it forms a JSON 
blob to represent changes to the row including pre and/or post row images.
 * {*}CDC Query Compiler{*}: This is a client side component. It prepares the 
scan object based on the given CDC query statement. 
 * {*}CDC DDL Compiler{*}: This is a client side component. It creates the time 
based uncovered (global/local) index based on the given CDC DDL statement and a 
virtual table of CDC type. CDC will be a new table type. 

A CDC DDL syntax to create CDC on a (data) table can be as follows: 

Create CDC  on  INCLUDE (pre | post | latest | 
all) INDEX =  SALT_BUCKETS=

The above CDC DDL creates a virtual CDC table and an uncovered index. The CDC 
table PK columns start with the timestamp and continue with the data table PK 
columns. The CDC table includes one non-PK column which is a JSON column. The 
change is expressed in this JSON column in multiple ways based on the CDC DDL 
or query statement. The change can be expressed as just the mutation for the 
change, the latest image of the row, the pre image of the row (the image before 
the change), the post image, or any combination of these. The CDC table is not 
a physical table on disk. It is just a virtual table to be used in a CDC query. 
Phoenix stores just the metadata for this virtual table. 

A CDC query can be as follow:

Select * from  where PHOENIX_ROW_TIMESTAMP() >= TO_DATE( …) AND 
PHOENIX_ROW_TIMESTAMP() < TO_DATE( …)

This query would return the rows of the CDC table which is constructed on the 
server side by CDCUncoveredIndexRegionScanner by joining the uncovered index 
row versions with the corresponding data table row version (using raw scans). 
The above select query can be hinted at by using a new CDC hint to return just 
the actual change, pre, pos, or latest image of the row, or a combination of 
them to overwrite the default JSON column format defined by the CDC DDL 
statement. 

The CDC application will run the above query in a loop. When the difference 
between the current time of the application and the upper limit 

[jira] [Updated] (PHOENIX-7001) Change Data Capture leveraging Max Lookback and Uncovered Indexes

2024-01-23 Thread Kadir Ozdemir (Jira)


 [ 
https://issues.apache.org/jira/browse/PHOENIX-7001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kadir Ozdemir updated PHOENIX-7001:
---
Description: 
The use cases for a Change Data Capture (CDC) feature are centered around 
capturing changes to a given table (or updatable view) as these changes happen 
in near real-time. A CDC application can retrieve changes in real-time or with 
some delay, or even retrieves the same set of changes multiple times. This 
means the CDC use case can be generalized as time range queries where the time 
range is typically short such as last x minutes or hours or expressed as a 
specific time range in the last n days where n is typically less than 7.

A change is an update in a row. That is, a change is either updating one or 
more columns of a table for a given row or deleting a row. It is desirable to 
provide these changes in the order of their arrival. One can visualize the 
delivery of these changes through a stream from a Phoenix table to the 
application that is initiated by the application similar to the delivery of any 
other Phoenix query results. The difference is that a regular query result 
includes at most one result row for each row satisfying the query and the 
deleted rows are not visible to the query result while the CDC stream/result 
can include multiple result rows for each row and the result includes deleted 
rows. Some use cases need to also get the pre and/or post image of the row 
along with a change on the row. 

The design proposed here leverages Phoenix Max Lookback and Uncovered (Global 
or Local) Indexes. The max lookback feature retains recent changes to a table, 
that is, the changes that have been done in the last x days typically. This 
means that the max lookback feature already captures the changes to a given 
table. Currently, the max lookback age is configurable at the cluster level. We 
need to extend this capability to be able to configure the max lookback age at 
the table level so that each table can have a different max lookback age based 
on its CDC application requirements.

To deliver the changes in the order of their arrival, we need a time based 
index. This index should be uncovered as the changes are already retained in 
the table by the max lookback feature. The arrival time will be defined as the 
mutation timestamp generated by the server. An uncovered index would allow us 
to efficiently and orderly access to the changes. Changes to an index table are 
also preserved by the max lookback feature.

A CDC feature can be composed of the following components:
 * {*}CDCUncoveredIndexRegionScanner{*}: This is a server side scanner on an 
uncovered index used for CDC. This can inherit UncoveredIndexRegionScanner. It 
goes through index table rows using a raw scan to identify data table rows and 
retrieves these rows using a raw scan. Using the time range, it forms a JSON 
blob to represent changes to the row including pre and/or post row images.
 * {*}CDC Query Compiler{*}: This is a client side component. It prepares the 
scan object based on the given CDC query statement. 
 * {*}CDC DDL Compiler{*}: This is a client side component. It creates the time 
based uncovered (global/local) index based on the given CDC DDL statement and a 
virtual table of CDC type. CDC will be a new table type. 

A CDC DDL syntax to create CDC on a (data) table can be as follows: 

Create CDC  on  INCLUDE (pre | post | latest | 
all) MAX_LOOKBACK_AGE =  INDEX =  
SALT_BUCKETS=

The above CDC DDL creates a virtual CDC table and an uncovered index. The CDC 
table PK columns start with the timestamp and continue with the data table PK 
columns. The CDC table includes one non-PK column which is a JSON column. The 
change is expressed in this JSON column in multiple ways based on the CDC DDL 
or query statement. The change can be expressed as just the mutation for the 
change, the latest image of the row, the pre image of the row (the image before 
the change), the post image, or any combination of these. The CDC table is not 
a physical table on disk. It is just a virtual table to be used in a CDC query. 
Phoenix stores just the metadata for this virtual table. 

A CDC query can be as follow:

Select * from  where PHOENIX_ROW_TIMESTAMP() >= TO_DATE( …) AND 
PHOENIX_ROW_TIMESTAMP() < TO_DATE( …)

This query would return the rows of the CDC table which is constructed on the 
server side by CDCUncoveredIndexRegionScanner by joining the uncovered index 
row versions with the corresponding data table row version (using raw scans). 
The above select query can be hinted at by using a new CDC hint to return just 
the actual change, pre, pos, or latest image of the row, or a combination of 
them to overwrite the default JSON column format defined by the CDC DDL 
statement. 

The CDC application will run the above query in a loop. When the difference 
between the current time of the application 

[jira] [Updated] (PHOENIX-7001) Change Data Capture leveraging Max Lookback and Uncovered Indexes

2024-01-21 Thread Kadir Ozdemir (Jira)


 [ 
https://issues.apache.org/jira/browse/PHOENIX-7001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kadir Ozdemir updated PHOENIX-7001:
---
Description: 
The use cases for a Change Data Capture (CDC) feature are centered around 
capturing changes to a given table (or updatable view) as these changes happen 
in near real-time. A CDC application can retrieve changes in real-time or with 
some delay, or even retrieves the same set of changes multiple times. This 
means the CDC use case can be generalized as time range queries where the time 
range is typically short such as last x minutes or hours or expressed as a 
specific time range in the last n days where n is typically less than 7.

A change is an update in a row. That is, a change is either updating one or 
more columns of a table for a given row or deleting a row. It is desirable to 
provide these changes in the order of their arrival. One can visualize the 
delivery of these changes through a stream from a Phoenix table to the 
application that is initiated by the application similar to the delivery of any 
other Phoenix query results. The difference is that a regular query result 
includes at most one result row for each row satisfying the query and the 
deleted rows are not visible to the query result while the CDC stream/result 
can include multiple result rows for each row and the result includes deleted 
rows. Some use cases need to also get the pre and/or post image of the row 
along with a change on the row. 

The design proposed here leverages Phoenix Max Lookback and Uncovered (Global 
or Local) Indexes. The max lookback feature retains recent changes to a table, 
that is, the changes that have been done in the last x days typically. This 
means that the max lookback feature already captures the changes to a given 
table. Currently, the max lookback age is configurable at the cluster level. We 
need to extend this capability to be able to configure the max lookback age at 
the table level so that each table can have a different max lookback age based 
on its CDC application requirements.

To deliver the changes in the order of their arrival, we need a time based 
index. This index should be uncovered as the changes are already retained in 
the table by the max lookback feature. The arrival time will be defined as the 
mutation timestamp generated by the server. An uncovered index would allow us 
to efficiently and orderly access to the changes. Changes to an index table are 
also preserved by the max lookback feature.

A CDC feature can be composed of the following components:
 * {*}CDCUncoveredIndexRegionScanner{*}: This is a server side scanner on an 
uncovered index used for CDC. This can inherit UncoveredIndexRegionScanner. It 
goes through index table rows using a raw scan to identify data table rows and 
retrieves these rows using a raw scan. Using the time range, it forms a JSON 
blob to represent changes to the row including pre and/or post row images.
 * {*}CDC Query Compiler{*}: This is a client side component. It prepares the 
scan object based on the given CDC query statement. 
 * {*}CDC DDL Compiler{*}: This is a client side component. It creates the time 
based uncovered (global/local) index based on the given CDC DDL statement and a 
virtual table of CDC type. CDC will be a new table type. 

A CDC DDL syntax to create CDC on a (data) table can be as follows: 

Create CDC  on  INCLUDE (pre | post | latest | 
all) TTL =  INDEX =  SALT_BUCKETS=

The above CDC DDL creates a virtual CDC table and an uncovered index. The CDC 
table PK columns start with the timestamp and continue with the data table PK 
columns. The CDC table includes one non-PK column which is a JSON column. The 
change is expressed in this JSON column in multiple ways based on the CDC DDL 
or query statement. The change can be expressed as just the mutation for the 
change, the latest image of the row, the pre image of the row (the image before 
the change), the post image, or any combination of these. The CDC table is not 
a physical table on disk. It is just a virtual table to be used in a CDC query. 
Phoenix stores just the metadata for this virtual table. 

A CDC query can be as follow:

Select * from  where PHOENIX_ROW_TIMESTAMP() >= TO_DATE( …) AND 
PHOENIX_ROW_TIMESTAMP() < TO_DATE( …)

This query would return the rows of the CDC table which is constructed on the 
server side by CDCUncoveredIndexRegionScanner by joining the uncovered index 
row versions with the corresponding data table row version (using raw scans). 
The above select query can be hinted at by using a new CDC hint to return just 
the actual change, pre, pos, or latest image of the row, or a combination of 
them to overwrite the default JSON column format defined by the CDC DDL 
statement. 

The CDC application will run the above query in a loop. When the difference 
between the current time of the application and the upper