haridsv commented on code in PR #1813: URL: https://github.com/apache/phoenix/pull/1813#discussion_r1477029813
########## phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java: ########## @@ -110,4 +109,25 @@ public static Scan initForRawScan(Scan scan) { } return scan; } + + public static int compareCellFamilyAndQualifier(byte[] columnFamily1, + byte[] columnQual1, + byte[] columnFamily2, + byte[] columnQual2) { + int familyNameComparison = CDCUtil.compare(columnFamily1, columnFamily2); + if (familyNameComparison != 0) { + return familyNameComparison; + } + return CDCUtil.compare(columnQual1, columnQual2); Review Comment: Why not use `Arrays.compare()? I see its use in Protobuf too so it must be efficient. ```suggestion return Arrays.compare(columnQual1, 0, columnQual1.length, columnQual2, 0, columnQual2.length); ``` ########## phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java: ########## @@ -1332,100 +1326,8 @@ public static void setScanAttributesForClient(Scan scan, PTable table, scan.setAttribute(CDC_INCLUDE_SCOPES, context.getEncodedCdcIncludeScopes().getBytes(StandardCharsets.UTF_8)); Review Comment: We should stuff this also into CDCTableInfo and let protobuf take care of it. ########## phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java: ########## @@ -308,6 +308,14 @@ enum JoinType {INNER, LEFT_OUTER} String CDC_JSON_COL_NAME = "CDC JSON"; + String EVENT_TYPE = "event_type"; + String PRE_IMAGE = "pre_image"; + String POST_IMAGE = "post_image"; + String CHANGE_IMAGE = "change_image"; + String UPSERT_EVENT_TYPE = "upsert"; + String DELETE_EVENT_TYPE = "delete"; Review Comment: I think we should put CDC_ prerfix on all of these keys. ########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java: ########## @@ -223,4 +237,85 @@ protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { } return false; } + + private Result getCDCImage( + Map<String, Map<String, Object>> preImageObj, + Map<String, Map<String, Object>> changeImageObj, + boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) { + Map<String, Object> rowValueMap = new HashMap<>(); + + if (this.cdcChangeScopeSet.size() == 0 + || (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) { + rowValueMap.put(PRE_IMAGE, preImageObj); + } + + if (this.cdcChangeScopeSet.size() == 0 + || (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) { + rowValueMap.put(CHANGE_IMAGE, changeImageObj); + } + + Map<String, Map<String, Object>> postImageObj = new HashMap<>(); + if (this.cdcChangeScopeSet.size() == 0 + || (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.POST))) { + if (!isIndexCellDeleteRow) { + for (Map.Entry<String, Map<String, Object>> preImageObjFamily + : preImageObj.entrySet()) { + String columnFamily = preImageObjFamily.getKey(); + postImageObj.put(columnFamily, new HashMap<>()); + for (Map.Entry<String, Object> preImageColQual : + preImageObjFamily.getValue().entrySet()) { + postImageObj.get(columnFamily).put(preImageColQual.getKey(), + preImageColQual.getValue()); + } + } + for (Map.Entry<String, Map<String, Object>> changeImageObjFamily + : changeImageObj.entrySet()) { + String columnFamily = changeImageObjFamily.getKey(); + if (!postImageObj.containsKey(columnFamily)) { + postImageObj.put(columnFamily, new HashMap<>()); + } + for (Map.Entry<String, Object> changeImageColQual : + changeImageObjFamily.getValue().entrySet()) { + postImageObj.get(columnFamily).put(changeImageColQual.getKey(), + changeImageColQual.getValue()); Review Comment: Again, clone the original map and use putAll. ########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java: ########## @@ -90,128 +88,144 @@ public CDCGlobalIndexRegionScanner(final RegionScanner innerScanner, super(innerScanner, region, scan, env, dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, pageSizeMs, queryLimit); CDCUtil.initForRawScan(dataTableScan); - dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP)); - dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP)); + cdcDataTableInfo = CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef + .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF))); + Charset utf8Charset = StandardCharsets.UTF_8; + String cdcChangeScopeStr = utf8Charset.decode( + ByteBuffer.wrap(scan.getAttribute(CDC_INCLUDE_SCOPES))).toString(); + try { + cdcChangeScopeSet = CDCUtil.makeChangeScopeEnumsFromString(cdcChangeScopeStr); + } catch (SQLException e) { + throw new RuntimeException(e); + } } @Override protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws IOException { + //TODO: Get Timerange from the start row and end row of the index scan object + // and set it in the datatable scan object. +// if (scan.getStartRow().length == 8) { +// startTimeRange = PLong.INSTANCE.getCodec().decodeLong( +// scan.getStartRow(), 0, SortOrder.getDefault()); +// } +// if (scan.getStopRow().length == 8) { +// stopTimeRange = PLong.INSTANCE.getCodec().decodeLong( +// scan.getStopRow(), 0, SortOrder.getDefault()); +// } return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true)); } protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { if (indexRowIterator.hasNext()) { List<Cell> indexRow = indexRowIterator.next(); - for (Cell c: indexRow) { - if (c.getType() == Cell.Type.Put) { - result.add(c); - } - } + Cell firstCell = indexRow.get(indexRow.size() - 1); + byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(), + firstCell.getRowOffset(), firstCell.getRowLength()) + .copyBytesIfNecessary(); + ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr( + indexToDataRowKeyMap.get(indexRowKey)); + Result dataRow = dataRows.get(dataRowKey); + Long indexCellTS = firstCell.getTimestamp(); + Map<String, Map<String, Object>> preImageObj = new HashMap<>(); + Map<String, Map<String, Object>> changeImageObj = new HashMap<>(); + Long lowerBoundForPreImage = 0L; + boolean isIndexCellDeleteRow = false; + byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier(); try { - Result dataRow = null; - if (! result.isEmpty()) { - Cell firstCell = result.get(0); - byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(), - firstCell.getRowOffset(), firstCell.getRowLength()) - .copyBytesIfNecessary(); - ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr( - indexToDataRowKeyMap.get(indexRowKey)); - dataRow = dataRows.get(dataRowKey); - Long indexRowTs = result.get(0).getTimestamp(); - Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = dataRowChanges.get( - dataRowKey); - if (changeTimeline == null) { - List<Cell> resultCells = Arrays.asList(dataRow.rawCells()); - Collections.sort(resultCells, CellComparator.getInstance().reversed()); - List<Cell> deleteMarkers = new ArrayList<>(); - List<List<Cell>> columns = new LinkedList<>(); - Cell currentColumnCell = null; - Pair<byte[], byte[]> emptyKV = EncodedColumnsUtil.getEmptyKeyValueInfo( - EncodedColumnsUtil.getQualifierEncodingScheme(scan)); - List<Cell> currentColumn = null; - Set<Long> uniqueTimeStamps = new HashSet<>(); - // TODO: From CompactionScanner.formColumns(), see if this can be refactored. - for (Cell cell : resultCells) { - uniqueTimeStamps.add(cell.getTimestamp()); - if (cell.getType() != Cell.Type.Put) { - deleteMarkers.add(cell); - } - if (CellUtil.matchingColumn(cell, QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, - emptyKV.getFirst())) { - continue; + int columnListIndex = 0; + List<CDCTableInfo.CDCColumnInfo> cdcColumnInfoList = + this.cdcDataTableInfo.getColumnInfoList(); + for (Cell cell : dataRow.rawCells()) { + if (cell.getType() == Cell.Type.DeleteFamily) { + if (columnListIndex > 0) { + continue; + } + if (indexCellTS == cell.getTimestamp()) { + isIndexCellDeleteRow = true; + } else if (indexCellTS > cell.getTimestamp()) { + lowerBoundForPreImage = cell.getTimestamp(); + } + } else if (cell.getType() == Cell.Type.DeleteColumn + || cell.getType() == Cell.Type.Put) { + if (!Arrays.equals(cell.getQualifierArray(), emptyCQ) + && CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcColumnInfoList.get(columnListIndex) + .getColumnQualifier()) > 0) { + while (columnListIndex < cdcColumnInfoList.size() + && CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcColumnInfoList.get(columnListIndex) + .getColumnQualifier()) > 0) { + columnListIndex += 1; } - if (currentColumnCell == null) { - currentColumn = new LinkedList<>(); - currentColumnCell = cell; - currentColumn.add(cell); - } else if (!CellUtil.matchingColumn(cell, currentColumnCell)) { - columns.add(currentColumn); - currentColumn = new LinkedList<>(); - currentColumnCell = cell; - currentColumn.add(cell); - } else { - currentColumn.add(cell); + if (columnListIndex >= cdcColumnInfoList.size()) { + break; } } - if (currentColumn != null) { - columns.add(currentColumn); + if (CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcColumnInfoList.get(columnListIndex) + .getColumnQualifier()) < 0) { + continue; } - List<Long> sortedTimestamps = uniqueTimeStamps.stream().sorted().collect( - Collectors.toList()); - // FIXME: Does this need to be Concurrent? - Map<ImmutableBytesPtr, Cell> rollingRow = new HashMap<>(); - int[] columnPointers = new int[columns.size()]; - changeTimeline = new TreeMap<>(); - dataRowChanges.put(dataRowKey, changeTimeline); - for (Long ts : sortedTimestamps) { - for (int i = 0; i < columns.size(); ++i) { - Cell cell = columns.get(i).get(columnPointers[i]); - if (cell.getTimestamp() == ts) { - rollingRow.put(new ImmutableBytesPtr( - cell.getQualifierArray(), - cell.getQualifierOffset(), - cell.getQualifierLength()), - cell); - ++columnPointers[i]; + if (CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcColumnInfoList.get(columnListIndex) + .getColumnQualifier()) == 0) { + String columnFamily = StandardCharsets.UTF_8 + .decode(ByteBuffer.wrap(cdcColumnInfoList.get(columnListIndex) + .getColumnFamily())).toString(); + String columnQualifier = cdcColumnInfoList.get(columnListIndex) + .getColumnName(); + if (Arrays.equals( + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcDataTableInfo.getDefaultColumnFamily())) { + columnFamily = DEFAULT_COLUMN_FAMILY_STR; + } + if (cell.getTimestamp() < indexCellTS + && cell.getTimestamp() > lowerBoundForPreImage) { + if (!preImageObj.containsKey(columnFamily)) { + preImageObj.put(columnFamily, new HashMap<>()); + } + if (preImageObj.get(columnFamily).containsKey(columnQualifier)) { + continue; } + preImageObj.get(columnFamily).put(columnQualifier, + this.getColumnValue(cell, cdcColumnInfoList + .get(columnListIndex).getColumnType())); + } else if (cell.getTimestamp() == indexCellTS) { + if (!changeImageObj.containsKey(columnFamily)) { + changeImageObj.put(columnFamily, new HashMap<>()); + } + changeImageObj.get(columnFamily).put(columnQualifier, + this.getColumnValue(cell, cdcColumnInfoList + .get(columnListIndex).getColumnType())); } - Map<ImmutableBytesPtr, Cell> rowOfCells = new HashMap(); - rowOfCells.putAll(rollingRow); - changeTimeline.put(ts, rowOfCells); } } - - Map<ImmutableBytesPtr, Cell> mapOfCells = changeTimeline.get(indexRowTs); - if (mapOfCells != null) { - Map <String, Object> rowValueMap = new HashMap<>(mapOfCells.size()); - for (Map.Entry<ImmutableBytesPtr, Cell> entry: mapOfCells.entrySet()) { - String colName = dataColQualNameMap.get(entry.getKey()); - Object colVal = dataColQualTypeMap.get(entry.getKey()).toObject( - entry.getValue().getValueArray()); - rowValueMap.put(colName, colVal); - } - byte[] value = - new Gson().toJson(rowValueMap).getBytes(StandardCharsets.UTF_8); - CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY); - ImmutableBytesPtr family = new ImmutableBytesPtr(firstCell.getFamilyArray(), - firstCell.getFamilyOffset(), firstCell.getFamilyLength()); - dataRow = Result.create(Arrays.asList(builder. - setRow(dataRowKey.copyBytesIfNecessary()). - setFamily(family.copyBytesIfNecessary()). - setQualifier(scan.getAttribute((CDC_JSON_COL_QUALIFIER))). - setTimestamp(firstCell.getTimestamp()). - setValue(value). - setType(Cell.Type.Put). - build())); - } } - if (dataRow != null && tupleProjector != null) { - IndexUtil.addTupleAsOneCell(result, new ResultTuple(dataRow), + Result cdcRow = getCDCImage( + preImageObj, changeImageObj, isIndexCellDeleteRow, indexCellTS, firstCell); + if (cdcRow != null && tupleProjector != null) { + if (firstCell.getType() == Cell.Type.DeleteFamily) { + result.add(CellBuilderFactory.create(CellBuilderType.DEEP_COPY) Review Comment: Do we actually need to deep copy? ########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java: ########## @@ -90,128 +88,144 @@ public CDCGlobalIndexRegionScanner(final RegionScanner innerScanner, super(innerScanner, region, scan, env, dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, pageSizeMs, queryLimit); CDCUtil.initForRawScan(dataTableScan); - dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP)); - dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP)); + cdcDataTableInfo = CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef + .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF))); + Charset utf8Charset = StandardCharsets.UTF_8; + String cdcChangeScopeStr = utf8Charset.decode( + ByteBuffer.wrap(scan.getAttribute(CDC_INCLUDE_SCOPES))).toString(); + try { + cdcChangeScopeSet = CDCUtil.makeChangeScopeEnumsFromString(cdcChangeScopeStr); + } catch (SQLException e) { + throw new RuntimeException(e); + } } @Override protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws IOException { + //TODO: Get Timerange from the start row and end row of the index scan object + // and set it in the datatable scan object. +// if (scan.getStartRow().length == 8) { +// startTimeRange = PLong.INSTANCE.getCodec().decodeLong( +// scan.getStartRow(), 0, SortOrder.getDefault()); +// } +// if (scan.getStopRow().length == 8) { +// stopTimeRange = PLong.INSTANCE.getCodec().decodeLong( +// scan.getStopRow(), 0, SortOrder.getDefault()); +// } return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true)); } protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { if (indexRowIterator.hasNext()) { List<Cell> indexRow = indexRowIterator.next(); - for (Cell c: indexRow) { - if (c.getType() == Cell.Type.Put) { - result.add(c); - } - } + Cell firstCell = indexRow.get(indexRow.size() - 1); + byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(), + firstCell.getRowOffset(), firstCell.getRowLength()) + .copyBytesIfNecessary(); + ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr( + indexToDataRowKeyMap.get(indexRowKey)); + Result dataRow = dataRows.get(dataRowKey); + Long indexCellTS = firstCell.getTimestamp(); + Map<String, Map<String, Object>> preImageObj = new HashMap<>(); + Map<String, Map<String, Object>> changeImageObj = new HashMap<>(); + Long lowerBoundForPreImage = 0L; + boolean isIndexCellDeleteRow = false; + byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier(); try { - Result dataRow = null; - if (! result.isEmpty()) { - Cell firstCell = result.get(0); - byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(), - firstCell.getRowOffset(), firstCell.getRowLength()) - .copyBytesIfNecessary(); - ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr( - indexToDataRowKeyMap.get(indexRowKey)); - dataRow = dataRows.get(dataRowKey); - Long indexRowTs = result.get(0).getTimestamp(); - Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = dataRowChanges.get( - dataRowKey); - if (changeTimeline == null) { - List<Cell> resultCells = Arrays.asList(dataRow.rawCells()); - Collections.sort(resultCells, CellComparator.getInstance().reversed()); - List<Cell> deleteMarkers = new ArrayList<>(); - List<List<Cell>> columns = new LinkedList<>(); - Cell currentColumnCell = null; - Pair<byte[], byte[]> emptyKV = EncodedColumnsUtil.getEmptyKeyValueInfo( - EncodedColumnsUtil.getQualifierEncodingScheme(scan)); - List<Cell> currentColumn = null; - Set<Long> uniqueTimeStamps = new HashSet<>(); - // TODO: From CompactionScanner.formColumns(), see if this can be refactored. - for (Cell cell : resultCells) { - uniqueTimeStamps.add(cell.getTimestamp()); - if (cell.getType() != Cell.Type.Put) { - deleteMarkers.add(cell); - } - if (CellUtil.matchingColumn(cell, QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, - emptyKV.getFirst())) { - continue; + int columnListIndex = 0; + List<CDCTableInfo.CDCColumnInfo> cdcColumnInfoList = + this.cdcDataTableInfo.getColumnInfoList(); + for (Cell cell : dataRow.rawCells()) { + if (cell.getType() == Cell.Type.DeleteFamily) { + if (columnListIndex > 0) { + continue; + } + if (indexCellTS == cell.getTimestamp()) { + isIndexCellDeleteRow = true; + } else if (indexCellTS > cell.getTimestamp()) { + lowerBoundForPreImage = cell.getTimestamp(); + } + } else if (cell.getType() == Cell.Type.DeleteColumn + || cell.getType() == Cell.Type.Put) { + if (!Arrays.equals(cell.getQualifierArray(), emptyCQ) + && CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcColumnInfoList.get(columnListIndex) + .getColumnQualifier()) > 0) { Review Comment: Is this check really needed here? Won't the same check in the below while loop take care of it? ########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java: ########## @@ -90,128 +88,144 @@ public CDCGlobalIndexRegionScanner(final RegionScanner innerScanner, super(innerScanner, region, scan, env, dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, pageSizeMs, queryLimit); CDCUtil.initForRawScan(dataTableScan); - dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP)); - dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP)); + cdcDataTableInfo = CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef + .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF))); + Charset utf8Charset = StandardCharsets.UTF_8; + String cdcChangeScopeStr = utf8Charset.decode( + ByteBuffer.wrap(scan.getAttribute(CDC_INCLUDE_SCOPES))).toString(); + try { + cdcChangeScopeSet = CDCUtil.makeChangeScopeEnumsFromString(cdcChangeScopeStr); + } catch (SQLException e) { + throw new RuntimeException(e); + } } @Override protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws IOException { + //TODO: Get Timerange from the start row and end row of the index scan object + // and set it in the datatable scan object. +// if (scan.getStartRow().length == 8) { +// startTimeRange = PLong.INSTANCE.getCodec().decodeLong( +// scan.getStartRow(), 0, SortOrder.getDefault()); +// } +// if (scan.getStopRow().length == 8) { +// stopTimeRange = PLong.INSTANCE.getCodec().decodeLong( +// scan.getStopRow(), 0, SortOrder.getDefault()); +// } return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true)); } protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { if (indexRowIterator.hasNext()) { List<Cell> indexRow = indexRowIterator.next(); - for (Cell c: indexRow) { - if (c.getType() == Cell.Type.Put) { - result.add(c); - } - } + Cell firstCell = indexRow.get(indexRow.size() - 1); + byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(), + firstCell.getRowOffset(), firstCell.getRowLength()) + .copyBytesIfNecessary(); + ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr( + indexToDataRowKeyMap.get(indexRowKey)); + Result dataRow = dataRows.get(dataRowKey); + Long indexCellTS = firstCell.getTimestamp(); + Map<String, Map<String, Object>> preImageObj = new HashMap<>(); + Map<String, Map<String, Object>> changeImageObj = new HashMap<>(); + Long lowerBoundForPreImage = 0L; + boolean isIndexCellDeleteRow = false; + byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier(); try { - Result dataRow = null; - if (! result.isEmpty()) { - Cell firstCell = result.get(0); - byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(), - firstCell.getRowOffset(), firstCell.getRowLength()) - .copyBytesIfNecessary(); - ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr( - indexToDataRowKeyMap.get(indexRowKey)); - dataRow = dataRows.get(dataRowKey); - Long indexRowTs = result.get(0).getTimestamp(); - Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = dataRowChanges.get( - dataRowKey); - if (changeTimeline == null) { - List<Cell> resultCells = Arrays.asList(dataRow.rawCells()); - Collections.sort(resultCells, CellComparator.getInstance().reversed()); - List<Cell> deleteMarkers = new ArrayList<>(); - List<List<Cell>> columns = new LinkedList<>(); - Cell currentColumnCell = null; - Pair<byte[], byte[]> emptyKV = EncodedColumnsUtil.getEmptyKeyValueInfo( - EncodedColumnsUtil.getQualifierEncodingScheme(scan)); - List<Cell> currentColumn = null; - Set<Long> uniqueTimeStamps = new HashSet<>(); - // TODO: From CompactionScanner.formColumns(), see if this can be refactored. - for (Cell cell : resultCells) { - uniqueTimeStamps.add(cell.getTimestamp()); - if (cell.getType() != Cell.Type.Put) { - deleteMarkers.add(cell); - } - if (CellUtil.matchingColumn(cell, QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, - emptyKV.getFirst())) { - continue; + int columnListIndex = 0; + List<CDCTableInfo.CDCColumnInfo> cdcColumnInfoList = + this.cdcDataTableInfo.getColumnInfoList(); + for (Cell cell : dataRow.rawCells()) { + if (cell.getType() == Cell.Type.DeleteFamily) { + if (columnListIndex > 0) { + continue; + } + if (indexCellTS == cell.getTimestamp()) { + isIndexCellDeleteRow = true; + } else if (indexCellTS > cell.getTimestamp()) { + lowerBoundForPreImage = cell.getTimestamp(); + } + } else if (cell.getType() == Cell.Type.DeleteColumn + || cell.getType() == Cell.Type.Put) { + if (!Arrays.equals(cell.getQualifierArray(), emptyCQ) + && CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcColumnInfoList.get(columnListIndex) + .getColumnQualifier()) > 0) { + while (columnListIndex < cdcColumnInfoList.size() + && CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcColumnInfoList.get(columnListIndex) + .getColumnQualifier()) > 0) { + columnListIndex += 1; } - if (currentColumnCell == null) { - currentColumn = new LinkedList<>(); - currentColumnCell = cell; - currentColumn.add(cell); - } else if (!CellUtil.matchingColumn(cell, currentColumnCell)) { - columns.add(currentColumn); - currentColumn = new LinkedList<>(); - currentColumnCell = cell; - currentColumn.add(cell); - } else { - currentColumn.add(cell); + if (columnListIndex >= cdcColumnInfoList.size()) { + break; } } - if (currentColumn != null) { - columns.add(currentColumn); + if (CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcColumnInfoList.get(columnListIndex) + .getColumnQualifier()) < 0) { Review Comment: This check would be identical to the last call done in the above while loop correct? If you can save it you can avoid. In fact, if you convert the above while loop into an `while (true)`, then I think you can merge this and the one at 164 into the same loop. You would have to use loop labels to break out of multiple loops. ########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java: ########## @@ -90,128 +88,144 @@ public CDCGlobalIndexRegionScanner(final RegionScanner innerScanner, super(innerScanner, region, scan, env, dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, pageSizeMs, queryLimit); CDCUtil.initForRawScan(dataTableScan); - dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP)); - dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP)); + cdcDataTableInfo = CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef + .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF))); + Charset utf8Charset = StandardCharsets.UTF_8; + String cdcChangeScopeStr = utf8Charset.decode( + ByteBuffer.wrap(scan.getAttribute(CDC_INCLUDE_SCOPES))).toString(); + try { + cdcChangeScopeSet = CDCUtil.makeChangeScopeEnumsFromString(cdcChangeScopeStr); + } catch (SQLException e) { + throw new RuntimeException(e); + } } @Override protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws IOException { + //TODO: Get Timerange from the start row and end row of the index scan object + // and set it in the datatable scan object. +// if (scan.getStartRow().length == 8) { +// startTimeRange = PLong.INSTANCE.getCodec().decodeLong( +// scan.getStartRow(), 0, SortOrder.getDefault()); +// } +// if (scan.getStopRow().length == 8) { +// stopTimeRange = PLong.INSTANCE.getCodec().decodeLong( +// scan.getStopRow(), 0, SortOrder.getDefault()); +// } return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true)); } protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { if (indexRowIterator.hasNext()) { List<Cell> indexRow = indexRowIterator.next(); - for (Cell c: indexRow) { - if (c.getType() == Cell.Type.Put) { - result.add(c); - } - } + Cell firstCell = indexRow.get(indexRow.size() - 1); + byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(), + firstCell.getRowOffset(), firstCell.getRowLength()) + .copyBytesIfNecessary(); + ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr( + indexToDataRowKeyMap.get(indexRowKey)); + Result dataRow = dataRows.get(dataRowKey); + Long indexCellTS = firstCell.getTimestamp(); + Map<String, Map<String, Object>> preImageObj = new HashMap<>(); + Map<String, Map<String, Object>> changeImageObj = new HashMap<>(); + Long lowerBoundForPreImage = 0L; + boolean isIndexCellDeleteRow = false; + byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier(); try { - Result dataRow = null; - if (! result.isEmpty()) { - Cell firstCell = result.get(0); - byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(), - firstCell.getRowOffset(), firstCell.getRowLength()) - .copyBytesIfNecessary(); - ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr( - indexToDataRowKeyMap.get(indexRowKey)); - dataRow = dataRows.get(dataRowKey); - Long indexRowTs = result.get(0).getTimestamp(); - Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = dataRowChanges.get( - dataRowKey); - if (changeTimeline == null) { - List<Cell> resultCells = Arrays.asList(dataRow.rawCells()); - Collections.sort(resultCells, CellComparator.getInstance().reversed()); - List<Cell> deleteMarkers = new ArrayList<>(); - List<List<Cell>> columns = new LinkedList<>(); - Cell currentColumnCell = null; - Pair<byte[], byte[]> emptyKV = EncodedColumnsUtil.getEmptyKeyValueInfo( - EncodedColumnsUtil.getQualifierEncodingScheme(scan)); - List<Cell> currentColumn = null; - Set<Long> uniqueTimeStamps = new HashSet<>(); - // TODO: From CompactionScanner.formColumns(), see if this can be refactored. - for (Cell cell : resultCells) { - uniqueTimeStamps.add(cell.getTimestamp()); - if (cell.getType() != Cell.Type.Put) { - deleteMarkers.add(cell); - } - if (CellUtil.matchingColumn(cell, QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, - emptyKV.getFirst())) { - continue; + int columnListIndex = 0; + List<CDCTableInfo.CDCColumnInfo> cdcColumnInfoList = + this.cdcDataTableInfo.getColumnInfoList(); + for (Cell cell : dataRow.rawCells()) { + if (cell.getType() == Cell.Type.DeleteFamily) { + if (columnListIndex > 0) { + continue; + } + if (indexCellTS == cell.getTimestamp()) { + isIndexCellDeleteRow = true; + } else if (indexCellTS > cell.getTimestamp()) { + lowerBoundForPreImage = cell.getTimestamp(); + } + } else if (cell.getType() == Cell.Type.DeleteColumn + || cell.getType() == Cell.Type.Put) { + if (!Arrays.equals(cell.getQualifierArray(), emptyCQ) + && CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcColumnInfoList.get(columnListIndex) + .getColumnQualifier()) > 0) { + while (columnListIndex < cdcColumnInfoList.size() + && CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcColumnInfoList.get(columnListIndex) + .getColumnQualifier()) > 0) { + columnListIndex += 1; } - if (currentColumnCell == null) { - currentColumn = new LinkedList<>(); - currentColumnCell = cell; - currentColumn.add(cell); - } else if (!CellUtil.matchingColumn(cell, currentColumnCell)) { - columns.add(currentColumn); - currentColumn = new LinkedList<>(); - currentColumnCell = cell; - currentColumn.add(cell); - } else { - currentColumn.add(cell); + if (columnListIndex >= cdcColumnInfoList.size()) { + break; } } - if (currentColumn != null) { - columns.add(currentColumn); + if (CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcColumnInfoList.get(columnListIndex) + .getColumnQualifier()) < 0) { + continue; } - List<Long> sortedTimestamps = uniqueTimeStamps.stream().sorted().collect( - Collectors.toList()); - // FIXME: Does this need to be Concurrent? - Map<ImmutableBytesPtr, Cell> rollingRow = new HashMap<>(); - int[] columnPointers = new int[columns.size()]; - changeTimeline = new TreeMap<>(); - dataRowChanges.put(dataRowKey, changeTimeline); - for (Long ts : sortedTimestamps) { - for (int i = 0; i < columns.size(); ++i) { - Cell cell = columns.get(i).get(columnPointers[i]); - if (cell.getTimestamp() == ts) { - rollingRow.put(new ImmutableBytesPtr( - cell.getQualifierArray(), - cell.getQualifierOffset(), - cell.getQualifierLength()), - cell); - ++columnPointers[i]; + if (CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcColumnInfoList.get(columnListIndex) + .getColumnQualifier()) == 0) { + String columnFamily = StandardCharsets.UTF_8 + .decode(ByteBuffer.wrap(cdcColumnInfoList.get(columnListIndex) + .getColumnFamily())).toString(); + String columnQualifier = cdcColumnInfoList.get(columnListIndex) + .getColumnName(); + if (Arrays.equals( + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcDataTableInfo.getDefaultColumnFamily())) { + columnFamily = DEFAULT_COLUMN_FAMILY_STR; + } + if (cell.getTimestamp() < indexCellTS + && cell.getTimestamp() > lowerBoundForPreImage) { + if (!preImageObj.containsKey(columnFamily)) { + preImageObj.put(columnFamily, new HashMap<>()); + } + if (preImageObj.get(columnFamily).containsKey(columnQualifier)) { + continue; } + preImageObj.get(columnFamily).put(columnQualifier, + this.getColumnValue(cell, cdcColumnInfoList + .get(columnListIndex).getColumnType())); + } else if (cell.getTimestamp() == indexCellTS) { + if (!changeImageObj.containsKey(columnFamily)) { + changeImageObj.put(columnFamily, new HashMap<>()); + } + changeImageObj.get(columnFamily).put(columnQualifier, + this.getColumnValue(cell, cdcColumnInfoList + .get(columnListIndex).getColumnType())); Review Comment: Also, we could avoid building change image if only PRE is selected. ########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java: ########## @@ -90,128 +88,144 @@ public CDCGlobalIndexRegionScanner(final RegionScanner innerScanner, super(innerScanner, region, scan, env, dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, pageSizeMs, queryLimit); CDCUtil.initForRawScan(dataTableScan); - dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP)); - dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP)); + cdcDataTableInfo = CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef + .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF))); + Charset utf8Charset = StandardCharsets.UTF_8; + String cdcChangeScopeStr = utf8Charset.decode( + ByteBuffer.wrap(scan.getAttribute(CDC_INCLUDE_SCOPES))).toString(); + try { + cdcChangeScopeSet = CDCUtil.makeChangeScopeEnumsFromString(cdcChangeScopeStr); + } catch (SQLException e) { + throw new RuntimeException(e); + } } @Override protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws IOException { + //TODO: Get Timerange from the start row and end row of the index scan object + // and set it in the datatable scan object. +// if (scan.getStartRow().length == 8) { +// startTimeRange = PLong.INSTANCE.getCodec().decodeLong( +// scan.getStartRow(), 0, SortOrder.getDefault()); +// } +// if (scan.getStopRow().length == 8) { +// stopTimeRange = PLong.INSTANCE.getCodec().decodeLong( +// scan.getStopRow(), 0, SortOrder.getDefault()); +// } return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true)); } protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { if (indexRowIterator.hasNext()) { List<Cell> indexRow = indexRowIterator.next(); - for (Cell c: indexRow) { - if (c.getType() == Cell.Type.Put) { - result.add(c); - } - } + Cell firstCell = indexRow.get(indexRow.size() - 1); + byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(), + firstCell.getRowOffset(), firstCell.getRowLength()) + .copyBytesIfNecessary(); + ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr( + indexToDataRowKeyMap.get(indexRowKey)); + Result dataRow = dataRows.get(dataRowKey); + Long indexCellTS = firstCell.getTimestamp(); + Map<String, Map<String, Object>> preImageObj = new HashMap<>(); + Map<String, Map<String, Object>> changeImageObj = new HashMap<>(); + Long lowerBoundForPreImage = 0L; + boolean isIndexCellDeleteRow = false; + byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier(); try { - Result dataRow = null; - if (! result.isEmpty()) { - Cell firstCell = result.get(0); - byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(), - firstCell.getRowOffset(), firstCell.getRowLength()) - .copyBytesIfNecessary(); - ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr( - indexToDataRowKeyMap.get(indexRowKey)); - dataRow = dataRows.get(dataRowKey); - Long indexRowTs = result.get(0).getTimestamp(); - Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = dataRowChanges.get( - dataRowKey); - if (changeTimeline == null) { - List<Cell> resultCells = Arrays.asList(dataRow.rawCells()); - Collections.sort(resultCells, CellComparator.getInstance().reversed()); - List<Cell> deleteMarkers = new ArrayList<>(); - List<List<Cell>> columns = new LinkedList<>(); - Cell currentColumnCell = null; - Pair<byte[], byte[]> emptyKV = EncodedColumnsUtil.getEmptyKeyValueInfo( - EncodedColumnsUtil.getQualifierEncodingScheme(scan)); - List<Cell> currentColumn = null; - Set<Long> uniqueTimeStamps = new HashSet<>(); - // TODO: From CompactionScanner.formColumns(), see if this can be refactored. - for (Cell cell : resultCells) { - uniqueTimeStamps.add(cell.getTimestamp()); - if (cell.getType() != Cell.Type.Put) { - deleteMarkers.add(cell); - } - if (CellUtil.matchingColumn(cell, QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, - emptyKV.getFirst())) { - continue; + int columnListIndex = 0; + List<CDCTableInfo.CDCColumnInfo> cdcColumnInfoList = + this.cdcDataTableInfo.getColumnInfoList(); + for (Cell cell : dataRow.rawCells()) { + if (cell.getType() == Cell.Type.DeleteFamily) { + if (columnListIndex > 0) { + continue; + } + if (indexCellTS == cell.getTimestamp()) { + isIndexCellDeleteRow = true; + } else if (indexCellTS > cell.getTimestamp()) { + lowerBoundForPreImage = cell.getTimestamp(); + } + } else if (cell.getType() == Cell.Type.DeleteColumn + || cell.getType() == Cell.Type.Put) { + if (!Arrays.equals(cell.getQualifierArray(), emptyCQ) + && CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcColumnInfoList.get(columnListIndex) + .getColumnQualifier()) > 0) { + while (columnListIndex < cdcColumnInfoList.size() + && CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcColumnInfoList.get(columnListIndex) + .getColumnQualifier()) > 0) { + columnListIndex += 1; } - if (currentColumnCell == null) { - currentColumn = new LinkedList<>(); - currentColumnCell = cell; - currentColumn.add(cell); - } else if (!CellUtil.matchingColumn(cell, currentColumnCell)) { - columns.add(currentColumn); - currentColumn = new LinkedList<>(); - currentColumnCell = cell; - currentColumn.add(cell); - } else { - currentColumn.add(cell); + if (columnListIndex >= cdcColumnInfoList.size()) { + break; } } - if (currentColumn != null) { - columns.add(currentColumn); + if (CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcColumnInfoList.get(columnListIndex) + .getColumnQualifier()) < 0) { + continue; } - List<Long> sortedTimestamps = uniqueTimeStamps.stream().sorted().collect( - Collectors.toList()); - // FIXME: Does this need to be Concurrent? - Map<ImmutableBytesPtr, Cell> rollingRow = new HashMap<>(); - int[] columnPointers = new int[columns.size()]; - changeTimeline = new TreeMap<>(); - dataRowChanges.put(dataRowKey, changeTimeline); - for (Long ts : sortedTimestamps) { - for (int i = 0; i < columns.size(); ++i) { - Cell cell = columns.get(i).get(columnPointers[i]); - if (cell.getTimestamp() == ts) { - rollingRow.put(new ImmutableBytesPtr( - cell.getQualifierArray(), - cell.getQualifierOffset(), - cell.getQualifierLength()), - cell); - ++columnPointers[i]; + if (CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcColumnInfoList.get(columnListIndex) + .getColumnQualifier()) == 0) { + String columnFamily = StandardCharsets.UTF_8 + .decode(ByteBuffer.wrap(cdcColumnInfoList.get(columnListIndex) + .getColumnFamily())).toString(); + String columnQualifier = cdcColumnInfoList.get(columnListIndex) + .getColumnName(); + if (Arrays.equals( + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcDataTableInfo.getDefaultColumnFamily())) { + columnFamily = DEFAULT_COLUMN_FAMILY_STR; + } + if (cell.getTimestamp() < indexCellTS + && cell.getTimestamp() > lowerBoundForPreImage) { + if (!preImageObj.containsKey(columnFamily)) { + preImageObj.put(columnFamily, new HashMap<>()); + } + if (preImageObj.get(columnFamily).containsKey(columnQualifier)) { + continue; } + preImageObj.get(columnFamily).put(columnQualifier, + this.getColumnValue(cell, cdcColumnInfoList + .get(columnListIndex).getColumnType())); + } else if (cell.getTimestamp() == indexCellTS) { + if (!changeImageObj.containsKey(columnFamily)) { + changeImageObj.put(columnFamily, new HashMap<>()); + } + changeImageObj.get(columnFamily).put(columnQualifier, + this.getColumnValue(cell, cdcColumnInfoList + .get(columnListIndex).getColumnType())); Review Comment: This too is a repetition of lines from 198 to 200, and can be deduped if you make the map conditional (i.e., < indexCellTS use preImage map and == indexCellTS use changeImage map). ########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java: ########## @@ -90,128 +88,144 @@ public CDCGlobalIndexRegionScanner(final RegionScanner innerScanner, super(innerScanner, region, scan, env, dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, pageSizeMs, queryLimit); CDCUtil.initForRawScan(dataTableScan); - dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP)); - dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP)); + cdcDataTableInfo = CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef + .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF))); + Charset utf8Charset = StandardCharsets.UTF_8; + String cdcChangeScopeStr = utf8Charset.decode( + ByteBuffer.wrap(scan.getAttribute(CDC_INCLUDE_SCOPES))).toString(); + try { + cdcChangeScopeSet = CDCUtil.makeChangeScopeEnumsFromString(cdcChangeScopeStr); + } catch (SQLException e) { + throw new RuntimeException(e); + } } @Override protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws IOException { + //TODO: Get Timerange from the start row and end row of the index scan object + // and set it in the datatable scan object. +// if (scan.getStartRow().length == 8) { +// startTimeRange = PLong.INSTANCE.getCodec().decodeLong( +// scan.getStartRow(), 0, SortOrder.getDefault()); +// } +// if (scan.getStopRow().length == 8) { +// stopTimeRange = PLong.INSTANCE.getCodec().decodeLong( +// scan.getStopRow(), 0, SortOrder.getDefault()); +// } return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true)); } protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { if (indexRowIterator.hasNext()) { List<Cell> indexRow = indexRowIterator.next(); - for (Cell c: indexRow) { - if (c.getType() == Cell.Type.Put) { - result.add(c); - } - } + Cell firstCell = indexRow.get(indexRow.size() - 1); + byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(), + firstCell.getRowOffset(), firstCell.getRowLength()) + .copyBytesIfNecessary(); + ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr( + indexToDataRowKeyMap.get(indexRowKey)); + Result dataRow = dataRows.get(dataRowKey); + Long indexCellTS = firstCell.getTimestamp(); + Map<String, Map<String, Object>> preImageObj = new HashMap<>(); + Map<String, Map<String, Object>> changeImageObj = new HashMap<>(); + Long lowerBoundForPreImage = 0L; + boolean isIndexCellDeleteRow = false; + byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier(); try { - Result dataRow = null; - if (! result.isEmpty()) { - Cell firstCell = result.get(0); - byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(), - firstCell.getRowOffset(), firstCell.getRowLength()) - .copyBytesIfNecessary(); - ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr( - indexToDataRowKeyMap.get(indexRowKey)); - dataRow = dataRows.get(dataRowKey); - Long indexRowTs = result.get(0).getTimestamp(); - Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = dataRowChanges.get( - dataRowKey); - if (changeTimeline == null) { - List<Cell> resultCells = Arrays.asList(dataRow.rawCells()); - Collections.sort(resultCells, CellComparator.getInstance().reversed()); - List<Cell> deleteMarkers = new ArrayList<>(); - List<List<Cell>> columns = new LinkedList<>(); - Cell currentColumnCell = null; - Pair<byte[], byte[]> emptyKV = EncodedColumnsUtil.getEmptyKeyValueInfo( - EncodedColumnsUtil.getQualifierEncodingScheme(scan)); - List<Cell> currentColumn = null; - Set<Long> uniqueTimeStamps = new HashSet<>(); - // TODO: From CompactionScanner.formColumns(), see if this can be refactored. - for (Cell cell : resultCells) { - uniqueTimeStamps.add(cell.getTimestamp()); - if (cell.getType() != Cell.Type.Put) { - deleteMarkers.add(cell); - } - if (CellUtil.matchingColumn(cell, QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, - emptyKV.getFirst())) { - continue; + int columnListIndex = 0; + List<CDCTableInfo.CDCColumnInfo> cdcColumnInfoList = + this.cdcDataTableInfo.getColumnInfoList(); + for (Cell cell : dataRow.rawCells()) { + if (cell.getType() == Cell.Type.DeleteFamily) { + if (columnListIndex > 0) { + continue; + } + if (indexCellTS == cell.getTimestamp()) { + isIndexCellDeleteRow = true; + } else if (indexCellTS > cell.getTimestamp()) { + lowerBoundForPreImage = cell.getTimestamp(); + } + } else if (cell.getType() == Cell.Type.DeleteColumn + || cell.getType() == Cell.Type.Put) { + if (!Arrays.equals(cell.getQualifierArray(), emptyCQ) + && CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcColumnInfoList.get(columnListIndex) + .getColumnQualifier()) > 0) { + while (columnListIndex < cdcColumnInfoList.size() + && CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcColumnInfoList.get(columnListIndex) + .getColumnQualifier()) > 0) { + columnListIndex += 1; } - if (currentColumnCell == null) { - currentColumn = new LinkedList<>(); - currentColumnCell = cell; - currentColumn.add(cell); - } else if (!CellUtil.matchingColumn(cell, currentColumnCell)) { - columns.add(currentColumn); - currentColumn = new LinkedList<>(); - currentColumnCell = cell; - currentColumn.add(cell); - } else { - currentColumn.add(cell); + if (columnListIndex >= cdcColumnInfoList.size()) { + break; } } - if (currentColumn != null) { - columns.add(currentColumn); + if (CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcColumnInfoList.get(columnListIndex) + .getColumnQualifier()) < 0) { + continue; } - List<Long> sortedTimestamps = uniqueTimeStamps.stream().sorted().collect( - Collectors.toList()); - // FIXME: Does this need to be Concurrent? - Map<ImmutableBytesPtr, Cell> rollingRow = new HashMap<>(); - int[] columnPointers = new int[columns.size()]; - changeTimeline = new TreeMap<>(); - dataRowChanges.put(dataRowKey, changeTimeline); - for (Long ts : sortedTimestamps) { - for (int i = 0; i < columns.size(); ++i) { - Cell cell = columns.get(i).get(columnPointers[i]); - if (cell.getTimestamp() == ts) { - rollingRow.put(new ImmutableBytesPtr( - cell.getQualifierArray(), - cell.getQualifierOffset(), - cell.getQualifierLength()), - cell); - ++columnPointers[i]; + if (CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcColumnInfoList.get(columnListIndex) + .getColumnQualifier()) == 0) { + String columnFamily = StandardCharsets.UTF_8 + .decode(ByteBuffer.wrap(cdcColumnInfoList.get(columnListIndex) + .getColumnFamily())).toString(); + String columnQualifier = cdcColumnInfoList.get(columnListIndex) + .getColumnName(); + if (Arrays.equals( + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcDataTableInfo.getDefaultColumnFamily())) { + columnFamily = DEFAULT_COLUMN_FAMILY_STR; + } + if (cell.getTimestamp() < indexCellTS + && cell.getTimestamp() > lowerBoundForPreImage) { + if (!preImageObj.containsKey(columnFamily)) { + preImageObj.put(columnFamily, new HashMap<>()); + } + if (preImageObj.get(columnFamily).containsKey(columnQualifier)) { + continue; } + preImageObj.get(columnFamily).put(columnQualifier, + this.getColumnValue(cell, cdcColumnInfoList + .get(columnListIndex).getColumnType())); + } else if (cell.getTimestamp() == indexCellTS) { + if (!changeImageObj.containsKey(columnFamily)) { + changeImageObj.put(columnFamily, new HashMap<>()); + } Review Comment: This is a repetition of lines 192 to 194. ########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java: ########## @@ -90,128 +84,148 @@ public CDCGlobalIndexRegionScanner(final RegionScanner innerScanner, super(innerScanner, region, scan, env, dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, pageSizeMs, queryLimit); CDCUtil.initForRawScan(dataTableScan); - dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP)); - dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP)); + cdcDataTableInfo = CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef + .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF))); + Charset utf8Charset = StandardCharsets.UTF_8; + String cdcChangeScopeStr = utf8Charset.decode( + ByteBuffer.wrap(scan.getAttribute(CDC_INCLUDE_SCOPES))).toString(); + try { + cdcChangeScopeSet = CDCUtil.makeChangeScopeEnumsFromString(cdcChangeScopeStr); + } catch (SQLException e) { + throw new RuntimeException(e); + } } @Override protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws IOException { + //TODO: Get Timerange from the start row and end row of the index scan object + // and set it in the datatable scan object. +// if (scan.getStartRow().length == 8) { +// startTimeRange = PLong.INSTANCE.getCodec().decodeLong( +// scan.getStartRow(), 0, SortOrder.getDefault()); +// } +// if (scan.getStopRow().length == 8) { +// stopTimeRange = PLong.INSTANCE.getCodec().decodeLong( +// scan.getStopRow(), 0, SortOrder.getDefault()); +// } return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true)); } protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { if (indexRowIterator.hasNext()) { List<Cell> indexRow = indexRowIterator.next(); - for (Cell c: indexRow) { - if (c.getType() == Cell.Type.Put) { - result.add(c); - } - } + Cell firstCell = indexRow.get(indexRow.size() - 1); + byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(), + firstCell.getRowOffset(), firstCell.getRowLength()) + .copyBytesIfNecessary(); Review Comment: This makes me wonder if making `indexToDataRowKeyMap` also a `Map< ImmutableBytesPtr,ImmutableBytesPtr>`, it could avoid some copies and make it more efficient. This is something we should explore offline. ########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java: ########## @@ -90,128 +88,144 @@ public CDCGlobalIndexRegionScanner(final RegionScanner innerScanner, super(innerScanner, region, scan, env, dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, pageSizeMs, queryLimit); CDCUtil.initForRawScan(dataTableScan); - dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP)); - dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP)); + cdcDataTableInfo = CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef + .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF))); + Charset utf8Charset = StandardCharsets.UTF_8; + String cdcChangeScopeStr = utf8Charset.decode( + ByteBuffer.wrap(scan.getAttribute(CDC_INCLUDE_SCOPES))).toString(); + try { + cdcChangeScopeSet = CDCUtil.makeChangeScopeEnumsFromString(cdcChangeScopeStr); + } catch (SQLException e) { + throw new RuntimeException(e); + } } @Override protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws IOException { + //TODO: Get Timerange from the start row and end row of the index scan object + // and set it in the datatable scan object. +// if (scan.getStartRow().length == 8) { +// startTimeRange = PLong.INSTANCE.getCodec().decodeLong( +// scan.getStartRow(), 0, SortOrder.getDefault()); +// } +// if (scan.getStopRow().length == 8) { +// stopTimeRange = PLong.INSTANCE.getCodec().decodeLong( +// scan.getStopRow(), 0, SortOrder.getDefault()); +// } return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true)); } protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { if (indexRowIterator.hasNext()) { List<Cell> indexRow = indexRowIterator.next(); - for (Cell c: indexRow) { - if (c.getType() == Cell.Type.Put) { - result.add(c); - } - } + Cell firstCell = indexRow.get(indexRow.size() - 1); + byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(), + firstCell.getRowOffset(), firstCell.getRowLength()) + .copyBytesIfNecessary(); + ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr( + indexToDataRowKeyMap.get(indexRowKey)); + Result dataRow = dataRows.get(dataRowKey); + Long indexCellTS = firstCell.getTimestamp(); + Map<String, Map<String, Object>> preImageObj = new HashMap<>(); + Map<String, Map<String, Object>> changeImageObj = new HashMap<>(); + Long lowerBoundForPreImage = 0L; + boolean isIndexCellDeleteRow = false; + byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier(); try { - Result dataRow = null; - if (! result.isEmpty()) { - Cell firstCell = result.get(0); - byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(), - firstCell.getRowOffset(), firstCell.getRowLength()) - .copyBytesIfNecessary(); - ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr( - indexToDataRowKeyMap.get(indexRowKey)); - dataRow = dataRows.get(dataRowKey); - Long indexRowTs = result.get(0).getTimestamp(); - Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = dataRowChanges.get( - dataRowKey); - if (changeTimeline == null) { - List<Cell> resultCells = Arrays.asList(dataRow.rawCells()); - Collections.sort(resultCells, CellComparator.getInstance().reversed()); - List<Cell> deleteMarkers = new ArrayList<>(); - List<List<Cell>> columns = new LinkedList<>(); - Cell currentColumnCell = null; - Pair<byte[], byte[]> emptyKV = EncodedColumnsUtil.getEmptyKeyValueInfo( - EncodedColumnsUtil.getQualifierEncodingScheme(scan)); - List<Cell> currentColumn = null; - Set<Long> uniqueTimeStamps = new HashSet<>(); - // TODO: From CompactionScanner.formColumns(), see if this can be refactored. - for (Cell cell : resultCells) { - uniqueTimeStamps.add(cell.getTimestamp()); - if (cell.getType() != Cell.Type.Put) { - deleteMarkers.add(cell); - } - if (CellUtil.matchingColumn(cell, QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, - emptyKV.getFirst())) { - continue; + int columnListIndex = 0; + List<CDCTableInfo.CDCColumnInfo> cdcColumnInfoList = + this.cdcDataTableInfo.getColumnInfoList(); + for (Cell cell : dataRow.rawCells()) { + if (cell.getType() == Cell.Type.DeleteFamily) { + if (columnListIndex > 0) { + continue; + } + if (indexCellTS == cell.getTimestamp()) { + isIndexCellDeleteRow = true; + } else if (indexCellTS > cell.getTimestamp()) { + lowerBoundForPreImage = cell.getTimestamp(); + } + } else if (cell.getType() == Cell.Type.DeleteColumn + || cell.getType() == Cell.Type.Put) { + if (!Arrays.equals(cell.getQualifierArray(), emptyCQ) + && CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcColumnInfoList.get(columnListIndex) + .getColumnQualifier()) > 0) { + while (columnListIndex < cdcColumnInfoList.size() + && CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcColumnInfoList.get(columnListIndex) + .getColumnQualifier()) > 0) { + columnListIndex += 1; } - if (currentColumnCell == null) { - currentColumn = new LinkedList<>(); - currentColumnCell = cell; - currentColumn.add(cell); - } else if (!CellUtil.matchingColumn(cell, currentColumnCell)) { - columns.add(currentColumn); - currentColumn = new LinkedList<>(); - currentColumnCell = cell; - currentColumn.add(cell); - } else { - currentColumn.add(cell); + if (columnListIndex >= cdcColumnInfoList.size()) { + break; } } - if (currentColumn != null) { - columns.add(currentColumn); + if (CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcColumnInfoList.get(columnListIndex) + .getColumnQualifier()) < 0) { + continue; } - List<Long> sortedTimestamps = uniqueTimeStamps.stream().sorted().collect( - Collectors.toList()); - // FIXME: Does this need to be Concurrent? - Map<ImmutableBytesPtr, Cell> rollingRow = new HashMap<>(); - int[] columnPointers = new int[columns.size()]; - changeTimeline = new TreeMap<>(); - dataRowChanges.put(dataRowKey, changeTimeline); - for (Long ts : sortedTimestamps) { - for (int i = 0; i < columns.size(); ++i) { - Cell cell = columns.get(i).get(columnPointers[i]); - if (cell.getTimestamp() == ts) { - rollingRow.put(new ImmutableBytesPtr( - cell.getQualifierArray(), - cell.getQualifierOffset(), - cell.getQualifierLength()), - cell); - ++columnPointers[i]; + if (CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcColumnInfoList.get(columnListIndex) + .getColumnQualifier()) == 0) { Review Comment: Another repetition of this call that I think can be avoided if the above logic assigns the return value to a local variable. ########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java: ########## @@ -223,4 +237,85 @@ protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { } return false; } + + private Result getCDCImage( + Map<String, Map<String, Object>> preImageObj, + Map<String, Map<String, Object>> changeImageObj, + boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) { + Map<String, Object> rowValueMap = new HashMap<>(); + + if (this.cdcChangeScopeSet.size() == 0 + || (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) { + rowValueMap.put(PRE_IMAGE, preImageObj); + } + + if (this.cdcChangeScopeSet.size() == 0 + || (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) { + rowValueMap.put(CHANGE_IMAGE, changeImageObj); + } + + Map<String, Map<String, Object>> postImageObj = new HashMap<>(); + if (this.cdcChangeScopeSet.size() == 0 + || (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.POST))) { + if (!isIndexCellDeleteRow) { + for (Map.Entry<String, Map<String, Object>> preImageObjFamily + : preImageObj.entrySet()) { + String columnFamily = preImageObjFamily.getKey(); + postImageObj.put(columnFamily, new HashMap<>()); + for (Map.Entry<String, Object> preImageColQual : + preImageObjFamily.getValue().entrySet()) { + postImageObj.get(columnFamily).put(preImageColQual.getKey(), + preImageColQual.getValue()); Review Comment: We should be able to just clone the original map. ########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java: ########## @@ -90,128 +88,144 @@ public CDCGlobalIndexRegionScanner(final RegionScanner innerScanner, super(innerScanner, region, scan, env, dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, pageSizeMs, queryLimit); CDCUtil.initForRawScan(dataTableScan); - dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP)); - dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP)); + cdcDataTableInfo = CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef + .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF))); + Charset utf8Charset = StandardCharsets.UTF_8; + String cdcChangeScopeStr = utf8Charset.decode( + ByteBuffer.wrap(scan.getAttribute(CDC_INCLUDE_SCOPES))).toString(); + try { + cdcChangeScopeSet = CDCUtil.makeChangeScopeEnumsFromString(cdcChangeScopeStr); + } catch (SQLException e) { + throw new RuntimeException(e); + } } @Override protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws IOException { + //TODO: Get Timerange from the start row and end row of the index scan object + // and set it in the datatable scan object. +// if (scan.getStartRow().length == 8) { +// startTimeRange = PLong.INSTANCE.getCodec().decodeLong( +// scan.getStartRow(), 0, SortOrder.getDefault()); +// } +// if (scan.getStopRow().length == 8) { +// stopTimeRange = PLong.INSTANCE.getCodec().decodeLong( +// scan.getStopRow(), 0, SortOrder.getDefault()); +// } return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true)); } protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { if (indexRowIterator.hasNext()) { List<Cell> indexRow = indexRowIterator.next(); - for (Cell c: indexRow) { - if (c.getType() == Cell.Type.Put) { - result.add(c); - } - } + Cell firstCell = indexRow.get(indexRow.size() - 1); + byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(), + firstCell.getRowOffset(), firstCell.getRowLength()) + .copyBytesIfNecessary(); + ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr( + indexToDataRowKeyMap.get(indexRowKey)); + Result dataRow = dataRows.get(dataRowKey); + Long indexCellTS = firstCell.getTimestamp(); + Map<String, Map<String, Object>> preImageObj = new HashMap<>(); + Map<String, Map<String, Object>> changeImageObj = new HashMap<>(); + Long lowerBoundForPreImage = 0L; + boolean isIndexCellDeleteRow = false; + byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier(); try { - Result dataRow = null; - if (! result.isEmpty()) { - Cell firstCell = result.get(0); - byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(), - firstCell.getRowOffset(), firstCell.getRowLength()) - .copyBytesIfNecessary(); - ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr( - indexToDataRowKeyMap.get(indexRowKey)); - dataRow = dataRows.get(dataRowKey); - Long indexRowTs = result.get(0).getTimestamp(); - Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = dataRowChanges.get( - dataRowKey); - if (changeTimeline == null) { - List<Cell> resultCells = Arrays.asList(dataRow.rawCells()); - Collections.sort(resultCells, CellComparator.getInstance().reversed()); - List<Cell> deleteMarkers = new ArrayList<>(); - List<List<Cell>> columns = new LinkedList<>(); - Cell currentColumnCell = null; - Pair<byte[], byte[]> emptyKV = EncodedColumnsUtil.getEmptyKeyValueInfo( - EncodedColumnsUtil.getQualifierEncodingScheme(scan)); - List<Cell> currentColumn = null; - Set<Long> uniqueTimeStamps = new HashSet<>(); - // TODO: From CompactionScanner.formColumns(), see if this can be refactored. - for (Cell cell : resultCells) { - uniqueTimeStamps.add(cell.getTimestamp()); - if (cell.getType() != Cell.Type.Put) { - deleteMarkers.add(cell); - } - if (CellUtil.matchingColumn(cell, QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, - emptyKV.getFirst())) { - continue; + int columnListIndex = 0; + List<CDCTableInfo.CDCColumnInfo> cdcColumnInfoList = + this.cdcDataTableInfo.getColumnInfoList(); + for (Cell cell : dataRow.rawCells()) { + if (cell.getType() == Cell.Type.DeleteFamily) { Review Comment: Don't we also need to match the CF here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org