haridsv commented on code in PR #1794:
URL: https://github.com/apache/phoenix/pull/1794#discussion_r1457249404


##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -223,4 +186,126 @@ protected boolean getNextCoveredIndexRow(List<Cell> 
result) throws IOException {
         }
         return false;
     }
+
+    private Result getCDCImage(
+            Map<ImmutableBytesPtr, Cell> preImageObj,
+            Map<ImmutableBytesPtr, Cell> changeImageObj,
+            boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) {
+        Map<String, Object> rowValueMap = new HashMap<>();
+
+        Map<String, Object> preImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0
+                || 
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) {
+            for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : 
preImageObj.entrySet()) {
+                if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) {

Review Comment:
   ```suggestion
                   if 
(dataColQualNameMap.containsKey(preImageObjCell.getKey())) {
   ```



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -223,4 +186,126 @@ protected boolean getNextCoveredIndexRow(List<Cell> 
result) throws IOException {
         }
         return false;
     }
+
+    private Result getCDCImage(
+            Map<ImmutableBytesPtr, Cell> preImageObj,
+            Map<ImmutableBytesPtr, Cell> changeImageObj,
+            boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) {
+        Map<String, Object> rowValueMap = new HashMap<>();
+
+        Map<String, Object> preImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0
+                || 
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) {
+            for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : 
preImageObj.entrySet()) {
+                if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) {
+                    
preImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+                            
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+                                    
preImageObjCell.getValue().getValueArray()));
+                }
+            }
+            rowValueMap.put(PRE_IMAGE, preImage);
+        }
+
+        Map<String, Object> changeImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0
+                || 
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) {
+            for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell
+                    : changeImageObj.entrySet()) {
+                if (dataColQualNameMap.get(changeImageObjCell.getKey()) != 
null) {
+                    
changeImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+                            
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+                                    
changeImageObjCell.getValue().getValueArray()));
+                }
+            }
+            rowValueMap.put(CHANGE_IMAGE, changeImage);
+        }
+
+        Map<String, Object> postImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0
+                || 
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.POST))) {
+            if (!isIndexCellDeleteRow) {
+                for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell
+                        : preImageObj.entrySet()) {
+                    if (dataColQualNameMap.get(preImageObjCell.getKey()) != 
null) {
+                        
postImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+                                
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+                                        
preImageObjCell.getValue().getValueArray()));
+                    }
+                }
+                for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell
+                        : changeImageObj.entrySet()) {
+                    if (dataColQualNameMap.get(changeImageObjCell.getKey()) != 
null) {
+                        
postImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+                                
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+                                        
changeImageObjCell.getValue().getValueArray()));
+                    }
+                }
+            }
+            rowValueMap.put(POST_IMAGE, postImage);
+        }
+
+        if (isIndexCellDeleteRow) {
+            rowValueMap.put(EVENT_TYPE, DELETE_EVENT_TYPE);
+        } else {
+            rowValueMap.put(EVENT_TYPE, UPSERT_EVENT_TYPE);
+        }
+
+        byte[] value =
+                new 
Gson().toJson(rowValueMap).getBytes(StandardCharsets.UTF_8);
+        CellBuilder builder = 
CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
+        Result cdcRow = Result.create(Arrays.asList(builder.
+                setRow(indexToDataRowKeyMap.get(new 
ImmutableBytesPtr(firstCell.getRowArray(),
+                        firstCell.getRowOffset(), firstCell.getRowLength())
+                        .copyBytesIfNecessary())).
+                setFamily(firstCell.getFamilyArray()).
+                setQualifier(scan.getAttribute(CDC_JSON_COL_QUALIFIER)).
+                setTimestamp(indexCellTS).
+                setValue(value).
+                setType(Cell.Type.Put).
+                build()));
+
+        return cdcRow;
+    }
+
+    @Override
+    protected void scanDataTableRows(long startTime) throws IOException {
+        super.scanDataTableRows(startTime);
+        List<List<Cell>> indexRowList = new ArrayList<>();
+        // Creating new Index Rows for Delete Row events
+        for (int rowIndex = 0; rowIndex < indexRows.size(); rowIndex++) {
+            List<Cell> indexRow = indexRows.get(rowIndex);
+            indexRowList.add(indexRow);
+            if (indexRow.size() > 1) {
+                List<Cell> deleteRow = null;
+                for (int cellIndex = indexRow.size() - 1; cellIndex >= 0; 
cellIndex--) {
+                    Cell cell = indexRow.get(cellIndex);
+                    if (cell.getType() == Cell.Type.DeleteFamily) {
+                        byte[] indexRowKey = new 
ImmutableBytesPtr(cell.getRowArray(),
+                                cell.getRowOffset(), cell.getRowLength())
+                                .copyBytesIfNecessary();
+                        ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+                                indexToDataRowKeyMap.get(indexRowKey));
+                        Result dataRow = dataRows.get(dataRowKey);
+                        for (Cell dataRowCell : dataRow.rawCells()) {
+                            // Note: Upsert adds delete family marker in the 
index table but not in the datatable.
+                            // Delete operation adds delete family marker in 
datatable as well as index table.
+                            if (dataRowCell.getType() == Cell.Type.DeleteFamily
+                                    && dataRowCell.getTimestamp() == 
cell.getTimestamp()) {
+                                if (deleteRow == null) {
+                                    deleteRow = new ArrayList<>();
+                                }
+                                deleteRow.add(cell);
+                                indexRowList.add(deleteRow);

Review Comment:
   If you change the deleteRow to be a boolean flag, this can just be:
   ```suggestion
                                   indexRowList.add(Arrays.asList(cell));
   ```



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -223,4 +186,126 @@ protected boolean getNextCoveredIndexRow(List<Cell> 
result) throws IOException {
         }
         return false;
     }
+
+    private Result getCDCImage(
+            Map<ImmutableBytesPtr, Cell> preImageObj,
+            Map<ImmutableBytesPtr, Cell> changeImageObj,
+            boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) {
+        Map<String, Object> rowValueMap = new HashMap<>();
+
+        Map<String, Object> preImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0
+                || 
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) {
+            for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : 
preImageObj.entrySet()) {
+                if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) {
+                    
preImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+                            
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+                                    
preImageObjCell.getValue().getValueArray()));
+                }
+            }
+            rowValueMap.put(PRE_IMAGE, preImage);
+        }
+
+        Map<String, Object> changeImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0
+                || 
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) {
+            for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell
+                    : changeImageObj.entrySet()) {
+                if (dataColQualNameMap.get(changeImageObjCell.getKey()) != 
null) {

Review Comment:
   ```suggestion
                   if 
(dataColQualNameMap.containsKey(changeImageObjCell.getKey())) {
   ```



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -223,4 +186,126 @@ protected boolean getNextCoveredIndexRow(List<Cell> 
result) throws IOException {
         }
         return false;
     }
+
+    private Result getCDCImage(
+            Map<ImmutableBytesPtr, Cell> preImageObj,
+            Map<ImmutableBytesPtr, Cell> changeImageObj,
+            boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) {
+        Map<String, Object> rowValueMap = new HashMap<>();
+
+        Map<String, Object> preImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0
+                || 
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) {
+            for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : 
preImageObj.entrySet()) {
+                if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) {
+                    
preImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+                            
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+                                    
preImageObjCell.getValue().getValueArray()));
+                }
+            }
+            rowValueMap.put(PRE_IMAGE, preImage);
+        }
+
+        Map<String, Object> changeImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0
+                || 
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) {
+            for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell
+                    : changeImageObj.entrySet()) {
+                if (dataColQualNameMap.get(changeImageObjCell.getKey()) != 
null) {
+                    
changeImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+                            
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+                                    
changeImageObjCell.getValue().getValueArray()));
+                }
+            }
+            rowValueMap.put(CHANGE_IMAGE, changeImage);
+        }
+
+        Map<String, Object> postImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0
+                || 
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.POST))) {
+            if (!isIndexCellDeleteRow) {
+                for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell
+                        : preImageObj.entrySet()) {
+                    if (dataColQualNameMap.get(preImageObjCell.getKey()) != 
null) {
+                        
postImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+                                
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+                                        
preImageObjCell.getValue().getValueArray()));
+                    }
+                }
+                for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell
+                        : changeImageObj.entrySet()) {
+                    if (dataColQualNameMap.get(changeImageObjCell.getKey()) != 
null) {
+                        
postImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+                                
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+                                        
changeImageObjCell.getValue().getValueArray()));
+                    }
+                }
+            }

Review Comment:
   Instead of using a separate loop for for building postImage, why not do it 
as part of both the pre and change image? I mean, you would repeat the lines 
201 and 215 for postImage as well.



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -223,4 +186,126 @@ protected boolean getNextCoveredIndexRow(List<Cell> 
result) throws IOException {
         }
         return false;
     }
+
+    private Result getCDCImage(
+            Map<ImmutableBytesPtr, Cell> preImageObj,
+            Map<ImmutableBytesPtr, Cell> changeImageObj,
+            boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) {
+        Map<String, Object> rowValueMap = new HashMap<>();
+
+        Map<String, Object> preImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0
+                || 
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) {
+            for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : 
preImageObj.entrySet()) {
+                if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) {
+                    
preImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+                            
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+                                    
preImageObjCell.getValue().getValueArray()));
+                }
+            }
+            rowValueMap.put(PRE_IMAGE, preImage);
+        }
+
+        Map<String, Object> changeImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0
+                || 
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) {
+            for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell
+                    : changeImageObj.entrySet()) {
+                if (dataColQualNameMap.get(changeImageObjCell.getKey()) != 
null) {
+                    
changeImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+                            
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+                                    
changeImageObjCell.getValue().getValueArray()));
+                }
+            }
+            rowValueMap.put(CHANGE_IMAGE, changeImage);
+        }
+
+        Map<String, Object> postImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0
+                || 
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.POST))) {
+            if (!isIndexCellDeleteRow) {
+                for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell
+                        : preImageObj.entrySet()) {
+                    if (dataColQualNameMap.get(preImageObjCell.getKey()) != 
null) {
+                        
postImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+                                
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+                                        
preImageObjCell.getValue().getValueArray()));
+                    }
+                }
+                for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell
+                        : changeImageObj.entrySet()) {
+                    if (dataColQualNameMap.get(changeImageObjCell.getKey()) != 
null) {
+                        
postImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+                                
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+                                        
changeImageObjCell.getValue().getValueArray()));
+                    }
+                }
+            }
+            rowValueMap.put(POST_IMAGE, postImage);
+        }
+
+        if (isIndexCellDeleteRow) {
+            rowValueMap.put(EVENT_TYPE, DELETE_EVENT_TYPE);
+        } else {
+            rowValueMap.put(EVENT_TYPE, UPSERT_EVENT_TYPE);
+        }
+
+        byte[] value =
+                new 
Gson().toJson(rowValueMap).getBytes(StandardCharsets.UTF_8);
+        CellBuilder builder = 
CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
+        Result cdcRow = Result.create(Arrays.asList(builder.
+                setRow(indexToDataRowKeyMap.get(new 
ImmutableBytesPtr(firstCell.getRowArray(),
+                        firstCell.getRowOffset(), firstCell.getRowLength())
+                        .copyBytesIfNecessary())).
+                setFamily(firstCell.getFamilyArray()).
+                setQualifier(scan.getAttribute(CDC_JSON_COL_QUALIFIER)).
+                setTimestamp(indexCellTS).
+                setValue(value).
+                setType(Cell.Type.Put).
+                build()));
+
+        return cdcRow;
+    }
+
+    @Override
+    protected void scanDataTableRows(long startTime) throws IOException {
+        super.scanDataTableRows(startTime);
+        List<List<Cell>> indexRowList = new ArrayList<>();
+        // Creating new Index Rows for Delete Row events
+        for (int rowIndex = 0; rowIndex < indexRows.size(); rowIndex++) {
+            List<Cell> indexRow = indexRows.get(rowIndex);
+            indexRowList.add(indexRow);
+            if (indexRow.size() > 1) {
+                List<Cell> deleteRow = null;

Review Comment:
   Nit: Outside the loop you are just using this as a flag, so why not use a 
boolean flag here and make use of a local list at 294?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to