haridsv commented on code in PR #1794: URL: https://github.com/apache/phoenix/pull/1794#discussion_r1457249404
########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java: ########## @@ -223,4 +186,126 @@ protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { } return false; } + + private Result getCDCImage( + Map<ImmutableBytesPtr, Cell> preImageObj, + Map<ImmutableBytesPtr, Cell> changeImageObj, + boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) { + Map<String, Object> rowValueMap = new HashMap<>(); + + Map<String, Object> preImage = new HashMap<>(); + if (this.cdcChangeScopeSet.size() == 0 + || (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) { + for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : preImageObj.entrySet()) { + if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) { Review Comment: ```suggestion if (dataColQualNameMap.containsKey(preImageObjCell.getKey())) { ``` ########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java: ########## @@ -223,4 +186,126 @@ protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { } return false; } + + private Result getCDCImage( + Map<ImmutableBytesPtr, Cell> preImageObj, + Map<ImmutableBytesPtr, Cell> changeImageObj, + boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) { + Map<String, Object> rowValueMap = new HashMap<>(); + + Map<String, Object> preImage = new HashMap<>(); + if (this.cdcChangeScopeSet.size() == 0 + || (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) { + for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : preImageObj.entrySet()) { + if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) { + preImage.put(dataColQualNameMap.get(preImageObjCell.getKey()), + dataColQualTypeMap.get(preImageObjCell.getKey()).toObject( + preImageObjCell.getValue().getValueArray())); + } + } + rowValueMap.put(PRE_IMAGE, preImage); + } + + Map<String, Object> changeImage = new HashMap<>(); + if (this.cdcChangeScopeSet.size() == 0 + || (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) { + for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell + : changeImageObj.entrySet()) { + if (dataColQualNameMap.get(changeImageObjCell.getKey()) != null) { + changeImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()), + dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject( + changeImageObjCell.getValue().getValueArray())); + } + } + rowValueMap.put(CHANGE_IMAGE, changeImage); + } + + Map<String, Object> postImage = new HashMap<>(); + if (this.cdcChangeScopeSet.size() == 0 + || (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.POST))) { + if (!isIndexCellDeleteRow) { + for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell + : preImageObj.entrySet()) { + if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) { + postImage.put(dataColQualNameMap.get(preImageObjCell.getKey()), + dataColQualTypeMap.get(preImageObjCell.getKey()).toObject( + preImageObjCell.getValue().getValueArray())); + } + } + for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell + : changeImageObj.entrySet()) { + if (dataColQualNameMap.get(changeImageObjCell.getKey()) != null) { + postImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()), + dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject( + changeImageObjCell.getValue().getValueArray())); + } + } + } + rowValueMap.put(POST_IMAGE, postImage); + } + + if (isIndexCellDeleteRow) { + rowValueMap.put(EVENT_TYPE, DELETE_EVENT_TYPE); + } else { + rowValueMap.put(EVENT_TYPE, UPSERT_EVENT_TYPE); + } + + byte[] value = + new Gson().toJson(rowValueMap).getBytes(StandardCharsets.UTF_8); + CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY); + Result cdcRow = Result.create(Arrays.asList(builder. + setRow(indexToDataRowKeyMap.get(new ImmutableBytesPtr(firstCell.getRowArray(), + firstCell.getRowOffset(), firstCell.getRowLength()) + .copyBytesIfNecessary())). + setFamily(firstCell.getFamilyArray()). + setQualifier(scan.getAttribute(CDC_JSON_COL_QUALIFIER)). + setTimestamp(indexCellTS). + setValue(value). + setType(Cell.Type.Put). + build())); + + return cdcRow; + } + + @Override + protected void scanDataTableRows(long startTime) throws IOException { + super.scanDataTableRows(startTime); + List<List<Cell>> indexRowList = new ArrayList<>(); + // Creating new Index Rows for Delete Row events + for (int rowIndex = 0; rowIndex < indexRows.size(); rowIndex++) { + List<Cell> indexRow = indexRows.get(rowIndex); + indexRowList.add(indexRow); + if (indexRow.size() > 1) { + List<Cell> deleteRow = null; + for (int cellIndex = indexRow.size() - 1; cellIndex >= 0; cellIndex--) { + Cell cell = indexRow.get(cellIndex); + if (cell.getType() == Cell.Type.DeleteFamily) { + byte[] indexRowKey = new ImmutableBytesPtr(cell.getRowArray(), + cell.getRowOffset(), cell.getRowLength()) + .copyBytesIfNecessary(); + ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr( + indexToDataRowKeyMap.get(indexRowKey)); + Result dataRow = dataRows.get(dataRowKey); + for (Cell dataRowCell : dataRow.rawCells()) { + // Note: Upsert adds delete family marker in the index table but not in the datatable. + // Delete operation adds delete family marker in datatable as well as index table. + if (dataRowCell.getType() == Cell.Type.DeleteFamily + && dataRowCell.getTimestamp() == cell.getTimestamp()) { + if (deleteRow == null) { + deleteRow = new ArrayList<>(); + } + deleteRow.add(cell); + indexRowList.add(deleteRow); Review Comment: If you change the deleteRow to be a boolean flag, this can just be: ```suggestion indexRowList.add(Arrays.asList(cell)); ``` ########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java: ########## @@ -223,4 +186,126 @@ protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { } return false; } + + private Result getCDCImage( + Map<ImmutableBytesPtr, Cell> preImageObj, + Map<ImmutableBytesPtr, Cell> changeImageObj, + boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) { + Map<String, Object> rowValueMap = new HashMap<>(); + + Map<String, Object> preImage = new HashMap<>(); + if (this.cdcChangeScopeSet.size() == 0 + || (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) { + for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : preImageObj.entrySet()) { + if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) { + preImage.put(dataColQualNameMap.get(preImageObjCell.getKey()), + dataColQualTypeMap.get(preImageObjCell.getKey()).toObject( + preImageObjCell.getValue().getValueArray())); + } + } + rowValueMap.put(PRE_IMAGE, preImage); + } + + Map<String, Object> changeImage = new HashMap<>(); + if (this.cdcChangeScopeSet.size() == 0 + || (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) { + for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell + : changeImageObj.entrySet()) { + if (dataColQualNameMap.get(changeImageObjCell.getKey()) != null) { Review Comment: ```suggestion if (dataColQualNameMap.containsKey(changeImageObjCell.getKey())) { ``` ########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java: ########## @@ -223,4 +186,126 @@ protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { } return false; } + + private Result getCDCImage( + Map<ImmutableBytesPtr, Cell> preImageObj, + Map<ImmutableBytesPtr, Cell> changeImageObj, + boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) { + Map<String, Object> rowValueMap = new HashMap<>(); + + Map<String, Object> preImage = new HashMap<>(); + if (this.cdcChangeScopeSet.size() == 0 + || (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) { + for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : preImageObj.entrySet()) { + if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) { + preImage.put(dataColQualNameMap.get(preImageObjCell.getKey()), + dataColQualTypeMap.get(preImageObjCell.getKey()).toObject( + preImageObjCell.getValue().getValueArray())); + } + } + rowValueMap.put(PRE_IMAGE, preImage); + } + + Map<String, Object> changeImage = new HashMap<>(); + if (this.cdcChangeScopeSet.size() == 0 + || (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) { + for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell + : changeImageObj.entrySet()) { + if (dataColQualNameMap.get(changeImageObjCell.getKey()) != null) { + changeImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()), + dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject( + changeImageObjCell.getValue().getValueArray())); + } + } + rowValueMap.put(CHANGE_IMAGE, changeImage); + } + + Map<String, Object> postImage = new HashMap<>(); + if (this.cdcChangeScopeSet.size() == 0 + || (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.POST))) { + if (!isIndexCellDeleteRow) { + for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell + : preImageObj.entrySet()) { + if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) { + postImage.put(dataColQualNameMap.get(preImageObjCell.getKey()), + dataColQualTypeMap.get(preImageObjCell.getKey()).toObject( + preImageObjCell.getValue().getValueArray())); + } + } + for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell + : changeImageObj.entrySet()) { + if (dataColQualNameMap.get(changeImageObjCell.getKey()) != null) { + postImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()), + dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject( + changeImageObjCell.getValue().getValueArray())); + } + } + } Review Comment: Instead of using a separate loop for for building postImage, why not do it as part of both the pre and change image? I mean, you would repeat the lines 201 and 215 for postImage as well. ########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java: ########## @@ -223,4 +186,126 @@ protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { } return false; } + + private Result getCDCImage( + Map<ImmutableBytesPtr, Cell> preImageObj, + Map<ImmutableBytesPtr, Cell> changeImageObj, + boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) { + Map<String, Object> rowValueMap = new HashMap<>(); + + Map<String, Object> preImage = new HashMap<>(); + if (this.cdcChangeScopeSet.size() == 0 + || (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) { + for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : preImageObj.entrySet()) { + if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) { + preImage.put(dataColQualNameMap.get(preImageObjCell.getKey()), + dataColQualTypeMap.get(preImageObjCell.getKey()).toObject( + preImageObjCell.getValue().getValueArray())); + } + } + rowValueMap.put(PRE_IMAGE, preImage); + } + + Map<String, Object> changeImage = new HashMap<>(); + if (this.cdcChangeScopeSet.size() == 0 + || (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) { + for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell + : changeImageObj.entrySet()) { + if (dataColQualNameMap.get(changeImageObjCell.getKey()) != null) { + changeImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()), + dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject( + changeImageObjCell.getValue().getValueArray())); + } + } + rowValueMap.put(CHANGE_IMAGE, changeImage); + } + + Map<String, Object> postImage = new HashMap<>(); + if (this.cdcChangeScopeSet.size() == 0 + || (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.POST))) { + if (!isIndexCellDeleteRow) { + for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell + : preImageObj.entrySet()) { + if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) { + postImage.put(dataColQualNameMap.get(preImageObjCell.getKey()), + dataColQualTypeMap.get(preImageObjCell.getKey()).toObject( + preImageObjCell.getValue().getValueArray())); + } + } + for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell + : changeImageObj.entrySet()) { + if (dataColQualNameMap.get(changeImageObjCell.getKey()) != null) { + postImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()), + dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject( + changeImageObjCell.getValue().getValueArray())); + } + } + } + rowValueMap.put(POST_IMAGE, postImage); + } + + if (isIndexCellDeleteRow) { + rowValueMap.put(EVENT_TYPE, DELETE_EVENT_TYPE); + } else { + rowValueMap.put(EVENT_TYPE, UPSERT_EVENT_TYPE); + } + + byte[] value = + new Gson().toJson(rowValueMap).getBytes(StandardCharsets.UTF_8); + CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY); + Result cdcRow = Result.create(Arrays.asList(builder. + setRow(indexToDataRowKeyMap.get(new ImmutableBytesPtr(firstCell.getRowArray(), + firstCell.getRowOffset(), firstCell.getRowLength()) + .copyBytesIfNecessary())). + setFamily(firstCell.getFamilyArray()). + setQualifier(scan.getAttribute(CDC_JSON_COL_QUALIFIER)). + setTimestamp(indexCellTS). + setValue(value). + setType(Cell.Type.Put). + build())); + + return cdcRow; + } + + @Override + protected void scanDataTableRows(long startTime) throws IOException { + super.scanDataTableRows(startTime); + List<List<Cell>> indexRowList = new ArrayList<>(); + // Creating new Index Rows for Delete Row events + for (int rowIndex = 0; rowIndex < indexRows.size(); rowIndex++) { + List<Cell> indexRow = indexRows.get(rowIndex); + indexRowList.add(indexRow); + if (indexRow.size() > 1) { + List<Cell> deleteRow = null; Review Comment: Nit: Outside the loop you are just using this as a flag, so why not use a boolean flag here and make use of a local list at 294? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org